import hashlib
import hmac
import json
import logging
import os
import sys
import requests
from flask import Flask, request, jsonify
app = Flask(__name__)
# --- Configuration ---
BTCPAY_WEBHOOK_SECRET = os.getenv("BTCPAY_WEBHOOK_SECRET")
TEST_TOKEN = os.getenv("TEST_TOKEN")
BTCPAY_URL = os.getenv("BTCPAY_URL", "https://payment.nxtgroup.org")
BTCPAY_API_KEY = os.getenv("BTCPAY_API_KEY")
POSTMARK_API_KEY = os.getenv("POSTMARK_API_KEY")
FROM_NAME = os.getenv("FROM_NAME", "")
FROM_EMAIL = os.getenv("FROM_EMAIL", "billing@nxtgroup.org")
BCC_EMAIL = os.getenv("BCC_EMAIL", "admin@nxtgroup.org")
WEBHOOK_TEST_EMAIL = os.getenv("WEBHOOK_TEST_EMAIL")
DEBUG = os.getenv("DEBUG", "false").lower() in ("true", "1", "yes")
TEST_EMAIL_BANNER = "THIS IS A TEST TRANSACTIONAL EMAIL, PLEASE IGNORE."
logger = logging.getLogger("btcpaymailer")
logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
if not logger.handlers:
_handler = logging.StreamHandler(sys.stdout)
_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
logger.addHandler(_handler)
def log_debug(message, **fields):
if DEBUG:
suffix = f" | {json.dumps(fields, default=str)}" if fields else ""
logger.debug(f"{message}{suffix}")
def log_evaluation(message, **fields):
suffix = f" | {json.dumps(fields, default=str)}" if fields else ""
logger.info(f"[evaluation] {message}{suffix}")
def get_btcpay_sig_header():
for key, value in request.headers.items():
if key.lower() == "btcpay-sig":
return value
return None
def verify_btcpay_webhook(body: bytes, signature: str | None, secret: str | None) -> bool:
if not body or not signature or not secret:
return False
expected = "sha256=" + hmac.new(
secret.encode("utf-8"),
body,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(expected, signature)
def is_btcpay_test_invoice(invoice_id):
return bool(invoice_id and "__test__" in invoice_id)
def get_webhook_test_recipient():
if WEBHOOK_TEST_EMAIL:
return WEBHOOK_TEST_EMAIL
if BCC_EMAIL:
return BCC_EMAIL.split(",")[0].strip()
return FROM_EMAIL
def get_from_address():
if FROM_NAME:
return f"{FROM_NAME} <{FROM_EMAIL}>"
return FROM_EMAIL
def btcpay_headers():
return {"Authorization": f"token {BTCPAY_API_KEY}"}
def format_crypto_amount(value):
return f"{float(value):.8f}".rstrip("0").rstrip(".")
def pick_btc_payment_method(payment_methods):
if not payment_methods:
return None
for method in payment_methods:
payment_method_id = (method.get("paymentMethodId") or "").upper()
currency = (method.get("currency") or "").upper()
if payment_method_id == "BTC" or currency == "BTC":
return method
for method in payment_methods:
payment_method_id = (method.get("paymentMethodId") or "").upper()
if payment_method_id.startswith("BTC"):
return method
return payment_methods[0]
def extract_payment_amounts(invoice_data, payment_methods):
method = pick_btc_payment_method(payment_methods)
if method:
amount = float(method.get("amount", 0))
amount_paid = float(method.get("totalPaid", 0))
remaining = float(method.get("due", 0))
if remaining <= 0 and amount > amount_paid:
remaining = round(amount - amount_paid, 8)
currency = method.get("currency") or "BTC"
source = "payment_method"
else:
amount = float(invoice_data.get("amount", 0))
amount_paid = float(invoice_data.get("amountPaid", 0))
remaining = round(amount - amount_paid, 8) if amount_paid < amount else 0
currency = invoice_data.get("currency", "")
source = "invoice_fallback"
return {
"amount": amount,
"amount_paid": amount_paid,
"remaining": remaining,
"currency": currency,
"source": source,
"payment_method_id": method.get("paymentMethodId") if method else None,
}
def fetch_btcpay_invoice(store_id, invoice_id):
response = requests.get(
f"{BTCPAY_URL}/api/v1/stores/{store_id}/invoices/{invoice_id}",
headers=btcpay_headers(),
)
return response
def fetch_btcpay_payment_methods(store_id, invoice_id):
response = requests.get(
f"{BTCPAY_URL}/api/v1/stores/{store_id}/invoices/{invoice_id}/payment-methods",
headers=btcpay_headers(),
)
return response
def log_invoice_api_response(invoice_id, invoice_data, payment_methods, amounts):
invoice_summary = {
"invoice_id": invoice_id,
"status": invoice_data.get("status"),
"additional_status": invoice_data.get("additionalStatus"),
"fiat_amount": invoice_data.get("amount"),
"fiat_amount_paid": invoice_data.get("amountPaid"),
"fiat_currency": invoice_data.get("currency"),
}
payment_summary = [
{
"paymentMethodId": method.get("paymentMethodId"),
"currency": method.get("currency"),
"amount": method.get("amount"),
"totalPaid": method.get("totalPaid"),
"due": method.get("due"),
}
for method in (payment_methods or [])
]
log_debug("BTCPay invoice API response", **invoice_summary)
log_debug("BTCPay payment-methods API response", invoice_id=invoice_id, payment_methods=payment_summary)
log_evaluation(
"amounts resolved for email",
invoice_id=invoice_id,
source=amounts["source"],
payment_method_id=amounts["payment_method_id"],
total=amounts["amount"],
paid=amounts["amount_paid"],
remaining=amounts["remaining"],
currency=amounts["currency"],
)
def get_html_template(order_id, invoice_id, amount, currency, amount_paid, remaining, test_mode=False):
total_display = format_crypto_amount(amount)
paid_display = format_crypto_amount(amount_paid)
remaining_display = format_crypto_amount(remaining)
test_banner = ""
if test_mode:
test_banner = f"""
{TEST_EMAIL_BANNER}
"""
return f"""
{test_banner}
Incomplete Payment Detected
Hi,
We received a payment for your order #{order_id}, but it does not cover the full amount required to complete the purchase.
This usually happens when you didn't enter the Bitcoin amount on the payment vendor, or didn't use the correct bitcoin amount.
Total Amount (Bitcoin): {total_display} {currency}
Amount Paid (Bitcoin): {paid_display} {currency}
Remaining Balance (Bitcoin): {remaining_display} {currency}
To ensure your order is processed, please return to the invoice and pay the remaining balance before the timer expires.
Or copy and paste this link into your browser:
{BTCPAY_URL}/i/{invoice_id}
Alternatively, you can contact us on webchat on the store you placed the order on. We can for instance, manually add a lower time amount to the subscription.
Important: If the full balance is not received before the invoice expires, the order will be cancelled and you will need to contact support for a refund of the partial amount.
"""
def send_postmark_email(to_email, subject, html_body):
if not POSTMARK_API_KEY:
return {"ok": False, "status_code": None, "error": "POSTMARK_API_KEY is not set"}
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"X-Postmark-Server-Token": POSTMARK_API_KEY
}
payload = {
"From": get_from_address(),
"To": to_email,
"Bcc": BCC_EMAIL,
"Subject": subject,
"HtmlBody": html_body,
"MessageStream": "outbound"
}
response = requests.post("https://api.postmarkapp.com/email", json=payload, headers=headers)
result = {"ok": response.status_code == 200, "status_code": response.status_code}
if not result["ok"]:
result["error"] = response.text
return result
def send_test_email(to_email):
html_body = get_html_template("TEST-001", "test-invoice-id", 0.001, "BTC", 0.0005, 0.0005, test_mode=True)
subject = f"[TEST] {TEST_EMAIL_BANNER}"
return send_postmark_email(to_email, subject, html_body)
def handle_btcpay_test_webhook(invoice_id):
recipient = get_webhook_test_recipient()
log_evaluation(
"BTCPay test invoice detected, sending Postmark test email",
invoice_id=invoice_id,
to=recipient,
)
html_body = get_html_template("TEST-001", invoice_id, 0.001, "BTC", 0.0005, 0.0005, test_mode=True)
subject = f"[TEST] {TEST_EMAIL_BANNER}"
result = send_postmark_email(recipient, subject, html_body)
if result["ok"]:
log_evaluation("test email sent", to=recipient, invoice_id=invoice_id, postmark_status=result["status_code"])
return jsonify({
"status": "success",
"message": "BTCPay test webhook: Postmark test email sent",
"to": recipient,
}), 200
log_evaluation("test email failed", to=recipient, invoice_id=invoice_id, postmark_result=result)
return jsonify({"error": "Failed to send test email via Postmark", "details": result}), 500
def evaluate_partial_payment(invoice_data, invoice_id, amounts):
amount = amounts["amount"]
amount_paid = amounts["amount_paid"]
currency = amounts["currency"]
remaining = amounts["remaining"]
status = invoice_data.get('status', '')
metadata = invoice_data.get('metadata', {})
buyer_email = metadata.get('buyerEmail')
order_id = metadata.get('orderId', invoice_id)
is_new = status == 'New'
is_partial = 0 < amount_paid < amount
qualifies = is_new and is_partial
evaluation = {
"invoice_id": invoice_id,
"order_id": order_id,
"status": status,
"amount": amount,
"amount_paid": amount_paid,
"currency": currency,
"remaining": remaining,
"amount_source": amounts["source"],
"payment_method_id": amounts["payment_method_id"],
"checks": {
"status_is_new": is_new,
"amount_paid_gt_zero": amount_paid > 0,
"amount_paid_lt_total": amount_paid < amount,
"is_partial_payment": is_partial,
"qualifies_for_email": qualifies,
"has_buyer_email": bool(buyer_email),
},
"buyer_email": buyer_email,
}
log_evaluation("invoice evaluated", **evaluation)
if not qualifies:
reason = []
if not is_new:
reason.append(f"status is '{status}', expected 'New'")
if amount_paid <= 0:
reason.append("no payment received yet")
if amount_paid >= amount:
reason.append("payment meets or exceeds total")
log_evaluation("no email sent", decision="ignored", reasons=reason)
return None, evaluation
if not buyer_email:
log_evaluation("no email sent", decision="rejected", reason="missing metadata.buyerEmail")
return "missing_email", evaluation
return "send", evaluation
@app.route('/btcpay-webhook', methods=['POST'])
def btcpay_webhook():
raw_body = request.get_data()
signature = get_btcpay_sig_header()
if not verify_btcpay_webhook(raw_body, signature, BTCPAY_WEBHOOK_SECRET):
log_debug("webhook rejected", reason="invalid BTCPAY-SIG signature")
return jsonify({"error": "Unauthorized"}), 401
try:
data = json.loads(raw_body) if raw_body else {}
except json.JSONDecodeError:
log_debug("webhook rejected", reason="invalid JSON body")
return jsonify({"error": "Invalid JSON"}), 400
event_type = data.get('type')
log_debug(
"webhook received",
event_type=event_type,
store_id=data.get('storeId'),
invoice_id=data.get('invoiceId'),
delivery_id=data.get('deliveryId'),
is_redelivery=data.get('isRedelivery'),
payload=data,
)
if event_type != 'InvoiceReceivedPayment':
log_debug("webhook ignored", reason="not InvoiceReceivedPayment", event_type=event_type)
return jsonify({"status": "ignored", "reason": "Not a payment event"}), 200
store_id = data.get('storeId')
invoice_id = data.get('invoiceId')
if is_btcpay_test_invoice(invoice_id):
return handle_btcpay_test_webhook(invoice_id)
log_debug("fetching invoice from BTCPay", store_id=store_id, invoice_id=invoice_id)
invoice_resp = fetch_btcpay_invoice(store_id, invoice_id)
if invoice_resp.status_code != 200:
logger.error(
"BTCPay API error | %s",
json.dumps({
"endpoint": "invoice",
"store_id": store_id,
"invoice_id": invoice_id,
"status_code": invoice_resp.status_code,
"body": invoice_resp.text,
}),
)
return jsonify({"error": "Failed to fetch invoice"}), 500
invoice_data = invoice_resp.json()
payment_methods_resp = fetch_btcpay_payment_methods(store_id, invoice_id)
if payment_methods_resp.status_code != 200:
logger.error(
"BTCPay API error | %s",
json.dumps({
"endpoint": "payment-methods",
"store_id": store_id,
"invoice_id": invoice_id,
"status_code": payment_methods_resp.status_code,
"body": payment_methods_resp.text,
}),
)
return jsonify({"error": "Failed to fetch invoice payment methods"}), 500
payment_methods = payment_methods_resp.json()
amounts = extract_payment_amounts(invoice_data, payment_methods)
log_invoice_api_response(invoice_id, invoice_data, payment_methods, amounts)
action, evaluation = evaluate_partial_payment(invoice_data, invoice_id, amounts)
if action is None:
return jsonify({"status": "ignored", "reason": "Payment meets total or invoice not in New state"}), 200
if action == "missing_email":
return jsonify({"error": "No buyer email associated with invoice"}), 400
buyer_email = evaluation["buyer_email"]
order_id = evaluation["order_id"]
remaining = evaluation["remaining"]
amount = evaluation["amount"]
amount_paid = evaluation["amount_paid"]
currency = evaluation["currency"]
html_body = get_html_template(order_id, invoice_id, amount, currency, amount_paid, remaining)
subject = f"Action Required: Partial payment received for Order #{order_id}"
log_evaluation("sending email", to=buyer_email, order_id=order_id, bcc=BCC_EMAIL)
result = send_postmark_email(buyer_email, subject, html_body)
if result["ok"]:
log_evaluation("email sent", to=buyer_email, order_id=order_id, postmark_status=result["status_code"])
return jsonify({"status": "success", "message": "Partial payment email sent"}), 200
log_evaluation("email failed", to=buyer_email, order_id=order_id, postmark_result=result)
return jsonify({"error": "Failed to send email via Postmark", "details": result}), 500
@app.route('/test-email', methods=['POST'])
def test_email():
token = request.args.get('token')
if not TEST_TOKEN or token != TEST_TOKEN:
return jsonify({"error": "Unauthorized"}), 401
to_email = request.args.get('to') or request.json.get('to') if request.is_json else None
if not to_email:
return jsonify({"error": "Missing 'to' email address"}), 400
result = send_test_email(to_email)
if result["ok"]:
logger.info("test email sent | %s", json.dumps({"to": to_email, "from": get_from_address(), "bcc": BCC_EMAIL}))
return jsonify({
"status": "success",
"message": f"Test email sent to {to_email}",
"from": get_from_address(),
"bcc": BCC_EMAIL,
}), 200
logger.error("test email failed | %s", json.dumps({"to": to_email, "result": result}))
return jsonify({"error": "Failed to send test email via Postmark", "details": result}), 500
if __name__ == '__main__':
if len(sys.argv) >= 3 and sys.argv[1] == 'test-email':
to = sys.argv[2]
outcome = send_test_email(to)
if outcome["ok"]:
print(f"OK: test email sent to {to} (from {get_from_address()}, bcc {BCC_EMAIL})")
sys.exit(0)
print(f"FAIL: {outcome}", file=sys.stderr)
sys.exit(1)
app.run(host='0.0.0.0', port=5000)