Documentation

Python Example

Full Python Worker SDK example.

Source: https://github.com/relaymesh/relaymesh/blob/main/examples/python/worker/main.py

Install

pip install relaymesh
import os
import signal
import threading
from typing import Any, cast

import relaymesh as githook_sdk
from relaymesh import (
    Listener,
    New,
    WithConcurrency,
    WithClientProvider,
    WithEndpoint,
    WithListener,
    WithLogger,
    GitLabClient,
    BitbucketClient,
    GitHubClient,
    NewRemoteSCMClientProvider,
)

stop = threading.Event()


def shutdown(_signum, _frame):
    stop.set()


signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)

endpoint = os.getenv("GITHOOK_ENDPOINT", "https://relaymesh.vercel.app/api/connect")
rule_id = os.getenv("GITHOOK_RULE_ID", "85101e9f-3bcf-4ed0-b561-750c270ef6c3")


def int_from_env(name, fallback):
    raw = (os.getenv(name) or "").strip()
    if not raw:
        return fallback
    try:
        value = int(raw)
    except Exception:
        return fallback
    return value if value > 0 else fallback


concurrency = int_from_env("GITHOOK_CONCURRENCY", 4)
retry_count = int_from_env("GITHOOK_RETRY_COUNT", 1)


class ExampleLogger:
    def printf(self, fmt, *args):
        rendered = fmt
        for arg in args:
            rendered = rendered.replace("%s", str(arg), 1)
        print(f"example-worker {rendered}")

    def Printf(self, fmt, *args):
        self.printf(fmt, *args)


class ExampleListener(Listener):
    def on_message_start(self, ctx, evt):
        print(
            f"listener start log_id={evt.metadata.get('log_id', '')} provider={evt.provider} topic={evt.topic}"
        )

    def on_message_finish(self, ctx, evt, err=None):
        status = "failed" if err else "success"
        print(
            f"listener finish log_id={evt.metadata.get('log_id', '')} status={status} err={err or ''}"
        )

    def on_error(self, ctx, evt, err):
        log_id = ""
        provider = ""
        if evt is not None:
            provider = evt.provider
            log_id = evt.metadata.get("log_id", "")
        print(f"listener error log_id={log_id} provider={provider} err={err}")


options = [
    WithEndpoint(endpoint),
    WithClientProvider(NewRemoteSCMClientProvider()),
    WithConcurrency(concurrency),
    WithLogger(ExampleLogger()),
    WithListener(ExampleListener()),
]
with_retry_count = getattr(githook_sdk, "WithRetryCount", None)
if callable(with_retry_count):
    retry_option = cast(Any, with_retry_count(retry_count))
    wk = New(*options, retry_option)
else:
    wk = New(*options)


def repository_from_event(evt):
    normalized = getattr(evt, "normalized", None)
    if not normalized or not isinstance(normalized, dict):
        return "", ""
    repo_value = normalized.get("repository")
    if not isinstance(repo_value, dict):
        return "", ""
    full_name = repo_value.get("full_name", "")
    if isinstance(full_name, str) and "/" in full_name:
        parts = full_name.strip().split("/", 1)
        if len(parts) == 2 and parts[0] and parts[1]:
            return parts[0], parts[1]
    name = repo_value.get("name", "")
    owner_map = repo_value.get("owner")
    owner = owner_map.get("login", "") if isinstance(owner_map, dict) else ""
    return str(owner).strip(), str(name).strip()


def handle(ctx, evt):
    provider_name = (evt.provider or "").strip().lower()
    print(
        f"handler topic={evt.topic} provider={provider_name} type={evt.type} retry_count={retry_count} concurrency={concurrency}"
    )

    if provider_name == "github":
        gh = GitHubClient(evt)
        if not gh:
            print("github client not available (installation may not be configured)")
            return
        owner, repo = repository_from_event(evt)
        if not owner or not repo:
            print("repository info missing in payload; skipping github read")
            return
        try:
            repository = gh.request_json("GET", f"/repos/{owner}/{repo}")
            if isinstance(repository, dict):
                full_name = repository.get("full_name")
                private = repository.get("private")
                default_branch = repository.get("default_branch")
            else:
                full_name = None
                private = None
                default_branch = None
            print(
                f"github read ok full_name={full_name} "
                f"private={private} "
                f"default_branch={default_branch}"
            )
        except Exception as err:
            print(f"github read failed owner={owner} repo={repo} err={err}")
        return

    if provider_name == "gitlab":
        gl = GitLabClient(evt)
        if not gl:
            print("gitlab client not available (installation may not be configured)")
            return
        try:
            user = gl.request_json("GET", "/user")
            username = user.get("username") if isinstance(user, dict) else None
            print(f"gitlab read ok username={username}")
        except Exception as err:
            print(f"gitlab read failed err={err}")
        return

    if provider_name == "bitbucket":
        bb = BitbucketClient(evt)
        if not bb:
            print("bitbucket client not available (installation may not be configured)")
            return
        try:
            user = bb.request_json("GET", "/user")
            username = user.get("username") if isinstance(user, dict) else None
            print(f"bitbucket read ok username={username}")
        except Exception as err:
            print(f"bitbucket read failed err={err}")
        return

    print(f"unsupported provider={provider_name}; skipping scm call")


wk.HandleRule(rule_id, handle)

wk.Run(stop)