Skip to content

Reference

This section provides a comprehensive reference for all functions and classes in our project.

lampe

cli

commands

check_reviewed
check_reviewed(repo: Path = typer.Option(..., exists=True, file_okay=False, dir_okay=True, readable=True), repo_full_name: str | None = typer.Option(None, help='Repository full name (e.g. owner/repo)'), output: str = typer.Option('auto', help='Output provider (auto|console|github|gitlab|bitbucket)'), pr_number: int | None = typer.Option(None, '--pr', help='Pull request number (required for non-console providers)'))

Check if the token user has already reviewed this PR.

Returns exit code 0 if reviewed, 1 if not reviewed.

Source code in packages/lampe-cli/src/lampe/cli/commands/check_reviewed.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def check_reviewed(
    repo: Path = typer.Option(..., exists=True, file_okay=False, dir_okay=True, readable=True),
    repo_full_name: str | None = typer.Option(None, help="Repository full name (e.g. owner/repo)"),
    output: str = typer.Option("auto", help="Output provider (auto|console|github|gitlab|bitbucket)"),
    pr_number: int | None = typer.Option(None, "--pr", help="Pull request number (required for non-console providers)"),
):
    """Check if the token user has already reviewed this PR.

    Returns exit code 0 if reviewed, 1 if not reviewed.
    """
    initialize()
    repo_model = Repository(local_path=str(repo), full_name=repo_full_name)
    pr_model = PullRequest(
        number=pr_number or 0,
        title="",
        body=None,
        base_commit_hash="",
        base_branch_name="",
        head_commit_hash="",
        head_branch_name="",
    )

    try:
        provider = Provider.create_provider(provider_name=output, repository=repo_model, pull_request=pr_model)
    except ValueError as e:
        if "required" in str(e).lower() and "pr" in str(e).lower():
            print(f"❌ Error: PR number is required for {output} provider. Use --pr <number>", file=sys.stderr)
            sys.exit(1)
        raise

    try:
        has_reviewed = provider.has_reviewed()
        if has_reviewed:
            print("✅ PR has already been reviewed by the token user")
            sys.exit(0)
        else:
            print("❌ PR has not been reviewed by the token user yet")
            sys.exit(1)
    except Exception as e:
        print(f"❌ Error checking if PR has been reviewed: {e}", file=sys.stderr)
        sys.exit(1)
describe
describe(repo: Path = typer.Option(..., exists=True, file_okay=False, dir_okay=True, readable=True), repo_full_name: str | None = typer.Option(None, help='Repository full name (e.g. owner/repo)'), base: str = typer.Option(..., help='Base commit SHA'), head: str = typer.Option(..., help='Head commit SHA'), title: str = typer.Option('Pull Request', help='PR title (local runs)'), output: str = typer.Option('auto', help='Output provider (auto|console|github|gitlab|bitbucket)'), variant: str = typer.Option('default', help='default|agentic'), files_exclude: list[str] | None = typer.Option(None, '--exclude'), files_reinclude: list[str] | None = typer.Option(None, '--reinclude'), truncation_tokens: int = typer.Option(DEFAULT_MAX_TOKENS, '--max-tokens'), timeout: int | None = typer.Option(None, '--timeout-seconds'), verbose: bool = typer.Option(False, '--verbose/--no-verbose'))

Generate a PR description and deliver it to the specified output provider.

Source code in packages/lampe-cli/src/lampe/cli/commands/describe.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def describe(
    repo: Path = typer.Option(..., exists=True, file_okay=False, dir_okay=True, readable=True),
    repo_full_name: str | None = typer.Option(None, help="Repository full name (e.g. owner/repo)"),
    base: str = typer.Option(..., help="Base commit SHA"),
    head: str = typer.Option(..., help="Head commit SHA"),
    title: str = typer.Option("Pull Request", help="PR title (local runs)"),
    output: str = typer.Option("auto", help="Output provider (auto|console|github|gitlab|bitbucket)"),
    variant: str = typer.Option("default", help="default|agentic"),
    files_exclude: list[str] | None = typer.Option(None, "--exclude"),
    files_reinclude: list[str] | None = typer.Option(None, "--reinclude"),
    truncation_tokens: int = typer.Option(DEFAULT_MAX_TOKENS, "--max-tokens"),
    timeout: int | None = typer.Option(None, "--timeout-seconds"),
    verbose: bool = typer.Option(False, "--verbose/--no-verbose"),
):
    """Generate a PR description and deliver it to the specified output provider."""
    initialize()
    repo_model = Repository(local_path=str(repo), full_name=repo_full_name)
    pr_model = PullRequest(
        number=0,
        title=title,
        body=None,
        base_commit_hash=base,
        base_branch_name="",
        head_commit_hash=head,
        head_branch_name="",
    )

    provider = Provider.create_provider(provider_name=output, repository=repo_model, pull_request=pr_model)

    generator = DefaultGeneratorAdapter() if variant == "default" else AgenticGeneratorAdapter()
    pr_cfg = PRDescriptionConfig(
        files_exclude_patterns=list(files_exclude) if files_exclude else None,
        files_reinclude_patterns=list(files_reinclude) if files_reinclude else None,
        truncation_tokens=truncation_tokens,
        timeout=timeout,
        verbose=verbose,
    )

    async def _run():
        workflow_task = PRDescriptionOrchestratorWorkflow(
            provider=provider, generator=generator, timeout=timeout, verbose=verbose
        )
        await workflow_task.run(
            start_event=PRDescriptionStart(repository=repo_model, pull_request=pr_model, config=pr_cfg)
        )

    asyncio.run(_run())
healthcheck
healthcheck() -> None

Check if the CLI is healthy and can connect to the configured provider.

Source code in packages/lampe-cli/src/lampe/cli/commands/healthcheck.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def healthcheck() -> None:
    """Check if the CLI is healthy and can connect to the configured provider."""
    logger.info("🔍 Checking CLI health...")
    initialize()
    # Create dummy repository and pull request objects for testing
    repo = Repository(local_path=".", full_name="test/repo")
    pr = PullRequest(
        number=1,
        title="Test PR",
        base_commit_hash="test-base",
        base_branch_name="main",
        head_commit_hash="test-head",
        head_branch_name="feature/test",
    )

    # Initialize provider and run healthcheck
    try:
        provider: Provider = Provider.create_provider("auto", repository=repo, pull_request=pr)
        provider.healthcheck()

        # Check LLM API keys (provider-aware when LAMPE_MODEL_* env vars are set)
        logger.info("🔑 Checking LLM API keys...")
        openai_key = os.getenv("OPENAI_API_KEY")
        anthropic_key = os.getenv("ANTHROPIC_API_KEY")

        model_env_set = False
        for env_var in LAMPE_MODEL_ENV_VARS:
            model_value = os.getenv(env_var)
            if model_value and model_value.strip():
                model_env_set = True
                llm_provider = provider_from_model(model_value)
                if llm_provider == "anthropic" and not anthropic_key:
                    logger.info("❌ %s uses Anthropic but ANTHROPIC_API_KEY is not set", env_var)
                    sys.exit(1)
                if llm_provider == "openai" and not openai_key:
                    logger.info("❌ %s uses OpenAI but OPENAI_API_KEY is not set", env_var)
                    sys.exit(1)

        if not model_env_set and not openai_key and not anthropic_key:
            logger.info("❌ No LLM API keys found")
            logger.info("   Set at least one of:")
            logger.info("   - OPENAI_API_KEY for OpenAI models")
            logger.info("   - ANTHROPIC_API_KEY for Anthropic models")
            sys.exit(1)

        if openai_key:
            logger.info("✅ OPENAI_API_KEY is set")
        if anthropic_key:
            logger.info("✅ ANTHROPIC_API_KEY is set")

        logger.info("\n🎉 All health checks passed! CLI is ready to use.")

    except Exception as e:
        logger.exception(f"❌ Health check failed: {e}")
        sys.exit(1)
review
review(repo: Path = typer.Option(..., exists=True, file_okay=False, dir_okay=True, readable=True), repo_full_name: str | None = typer.Option(None, help='Repository full name (e.g. owner/repo)'), base: str = typer.Option(..., help='Base commit SHA'), head: str = typer.Option(..., help='Head commit SHA'), title: str = typer.Option('Pull Request', help='PR title (local runs)'), output: str = typer.Option('auto', help='Output provider (auto|console|github|gitlab|bitbucket)'), review_depth: ReviewDepth = typer.Option(ReviewDepth.STANDARD, help='Review depth (basic|standard|comprehensive)'), variant: str = typer.Option('agentic', help='Review variant: agentic (full) or quick (only critical and high issues)'), guidelines: list[str] | None = typer.Option(None, '--guideline', help='Custom review guidelines (can be repeated)'), files_exclude: list[str] | None = typer.Option(None, '--exclude'), timeout: int | None = typer.Option(None, '--timeout-seconds'), verbose: bool = typer.Option(False, '--verbose/--no-verbose'))

Generate a PR code review and deliver it to the specified output provider.

Model selection is automatic based on review_depth: - basic: gpt-5-nano - standard: gpt-5 - comprehensive: gpt-5.1

Source code in packages/lampe-cli/src/lampe/cli/commands/review.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def review(
    repo: Path = typer.Option(..., exists=True, file_okay=False, dir_okay=True, readable=True),
    repo_full_name: str | None = typer.Option(None, help="Repository full name (e.g. owner/repo)"),
    base: str = typer.Option(..., help="Base commit SHA"),
    head: str = typer.Option(..., help="Head commit SHA"),
    title: str = typer.Option("Pull Request", help="PR title (local runs)"),
    output: str = typer.Option("auto", help="Output provider (auto|console|github|gitlab|bitbucket)"),
    review_depth: ReviewDepth = typer.Option(ReviewDepth.STANDARD, help="Review depth (basic|standard|comprehensive)"),
    variant: str = typer.Option(
        "agentic",
        help="Review variant: agentic (full) or quick (only critical and high issues)",
    ),
    guidelines: list[str] | None = typer.Option(None, "--guideline", help="Custom review guidelines (can be repeated)"),
    files_exclude: list[str] | None = typer.Option(None, "--exclude"),
    timeout: int | None = typer.Option(None, "--timeout-seconds"),
    verbose: bool = typer.Option(False, "--verbose/--no-verbose"),
):
    """Generate a PR code review and deliver it to the specified output provider.

    Model selection is automatic based on review_depth:
    - basic: gpt-5-nano
    - standard: gpt-5
    - comprehensive: gpt-5.1
    """
    initialize()
    repo_model = Repository(local_path=str(repo), full_name=repo_full_name)
    pr_model = PullRequest(
        number=0,
        title=title,
        body=None,
        base_commit_hash=base,
        base_branch_name="",
        head_commit_hash=head,
        head_branch_name="",
    )

    provider = Provider.create_provider(provider_name=output, repository=repo_model, pull_request=pr_model)

    if variant == "agentic":
        generator = AgenticOrchestratorAdapter()
    elif variant == "quick":
        generator = QuickOrchestratorAdapter()
    else:
        typer.echo(f"Unknown review variant '{variant}'; using agentic.", err=True)
        generator = AgenticOrchestratorAdapter()
    pr_cfg = PRReviewConfig(
        review_depth=review_depth,
        custom_guidelines=guidelines,
        files_exclude_patterns=files_exclude,
        timeout=timeout,
        verbose=verbose,
    )

    async def _run():
        workflow_task = PRReviewOrchestratorWorkflow(
            provider=provider, generator=generator, timeout=timeout, verbose=verbose
        )
        await workflow_task.run(start_event=PRReviewStart(repository=repo_model, pull_request=pr_model, config=pr_cfg))

    asyncio.run(_run())

entrypoint

version() -> None

Show version information.

Source code in packages/lampe-cli/src/lampe/cli/entrypoint.py
18
19
20
21
22
23
24
25
@app.command()
def version() -> None:
    """Show version information."""
    import importlib.metadata

    version = importlib.metadata.version("lampe-cli")
    logger.info(f"🔦 Lampe CLI v{version}")
    logger.info("   Put some light on your codebase! ✨")

orchestrators

pr_review
AgenticOrchestratorAdapter

Uses the agentic orchestrator workflow (intent, skills, validation agents).

QuickOrchestratorAdapter

Uses the quick review workflow (single agent, grep-first, Claude 4.5 with thinking).

providers

base
PRReviewPayload(reviews: list[AgentReviewOutput]) dataclass
json_payload() -> str

Return the payload as a JSON string.

Source code in packages/lampe-cli/src/lampe/cli/providers/base.py
65
66
67
def json_payload(self) -> str:
    """Return the payload as a JSON string."""
    return json.dumps([review.model_dump() for review in self.reviews], indent=2)
Provider(repository: Repository, pull_request: PullRequest)

Bases: ABC

Abstract provider for delivering workflow outputs.

Source code in packages/lampe-cli/src/lampe/cli/providers/base.py
83
84
85
def __init__(self, repository: Repository, pull_request: PullRequest) -> None:
    self.repository = repository
    self.pull_request = pull_request
create_provider(provider_name: ProviderType | str, repository: Repository, pull_request: PullRequest) -> 'Provider' staticmethod

Create a provider instance based on the specified type.

Source code in packages/lampe-cli/src/lampe/cli/providers/base.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@staticmethod
def create_provider(
    provider_name: ProviderType | str, repository: Repository, pull_request: PullRequest
) -> "Provider":
    """Create a provider instance based on the specified type."""
    if isinstance(provider_name, str):
        # Handle "auto" detection
        if provider_name == "auto":
            provider_name = Provider.detect_provider_type()
        else:
            provider_name = ProviderType(provider_name)

    if provider_name == ProviderType.CONSOLE:
        from lampe.cli.providers.console import ConsoleProvider

        return ConsoleProvider(repository=repository, pull_request=pull_request)
    elif provider_name == ProviderType.GITHUB:
        from lampe.cli.providers.github import GitHubProvider

        return GitHubProvider(repository=repository, pull_request=pull_request)
    elif provider_name == ProviderType.BITBUCKET:
        from lampe.cli.providers.bitbucket import BitbucketProvider

        return BitbucketProvider(repository=repository, pull_request=pull_request)
    else:
        raise ValueError(f"Provider type {provider_name} not yet implemented")
deliver_pr_description(payload: PRDescriptionPayload) -> None abstractmethod

Deliver a PR description to the configured destination.

Source code in packages/lampe-cli/src/lampe/cli/providers/base.py
87
88
89
90
@abstractmethod
def deliver_pr_description(self, payload: PRDescriptionPayload) -> None:
    """Deliver a PR description to the configured destination."""
    ...
deliver_pr_review(payload: PRReviewPayload) -> None abstractmethod

Deliver a PR review to the configured destination.

Source code in packages/lampe-cli/src/lampe/cli/providers/base.py
92
93
94
95
@abstractmethod
def deliver_pr_review(self, payload: PRReviewPayload) -> None:
    """Deliver a PR review to the configured destination."""
    ...
detect_provider_type() -> ProviderType staticmethod

Detect the appropriate provider type based on available environment variables.

Source code in packages/lampe-cli/src/lampe/cli/providers/base.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
@staticmethod
def detect_provider_type() -> ProviderType:
    """Detect the appropriate provider type based on available environment variables."""
    # Priority order for provider detection
    env_var_mapping = {
        "GITHUB_API_TOKEN": ProviderType.GITHUB,
        "GITHUB_TOKEN": ProviderType.GITHUB,
        "LAMPE_GITHUB_TOKEN": ProviderType.GITHUB,
        "LAMPE_GITHUB_APP_ID": ProviderType.GITHUB,
        "LAMPE_GITHUB_APP_PRIVATE_KEY": ProviderType.GITHUB,
        "GITLAB_API_TOKEN": ProviderType.GITLAB,
        "LAMPE_BITBUCKET_TOKEN": ProviderType.BITBUCKET,
        "LAMPE_BITBUCKET_APP_KEY": ProviderType.BITBUCKET,
        "BITBUCKET_WORKSPACE": ProviderType.BITBUCKET,
    }

    for env_var, provider_type in env_var_mapping.items():
        if os.getenv(env_var):
            return provider_type

    # Fallback to console if no API tokens are found
    return ProviderType.CONSOLE
has_reviewed() -> bool abstractmethod

Check if the token user has already reviewed this PR.

Source code in packages/lampe-cli/src/lampe/cli/providers/base.py
102
103
104
105
@abstractmethod
def has_reviewed(self) -> bool:
    """Check if the token user has already reviewed this PR."""
    ...
healthcheck() -> None abstractmethod

Check if the provider is healthy and can connect to the service.

Source code in packages/lampe-cli/src/lampe/cli/providers/base.py
 97
 98
 99
100
@abstractmethod
def healthcheck(self) -> None:
    """Check if the provider is healthy and can connect to the service."""
    ...
ProviderType

Bases: StrEnum

Available provider types.

update_or_add_text_between_tags(text: str, new_text: str, feature: str) -> str

Update the text between the tags and with new_text. If the tags don't exist, add them at the bottom of the text. The tags and new_text are preserved in the output.

Source code in packages/lampe-cli/src/lampe/cli/providers/base.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def update_or_add_text_between_tags(text: str, new_text: str, feature: str) -> str:
    """
    Update the text between the tags [](lampe-sdk-{feature}-start) and [](lampe-sdk-{feature}-end)
    with new_text. If the tags don't exist, add them at the bottom of the text.
    The tags and new_text are preserved in the output.
    """
    identifier = f"lampe-sdk-{feature}-start"
    start_tag = rf"\[\]\(lampe-sdk-{feature}-start\)"
    end_tag = rf"\[\]\(lampe-sdk-{feature}-end\)"

    pattern = re.compile(rf"({start_tag})(.*?|\s*?){end_tag}", re.DOTALL)

    def replacer(match):
        return f"{match.group(1)}\n{new_text}\n[]({identifier.replace('-start', '')}-end)"

    # Try to replace the first occurrence
    result, count = pattern.subn(replacer, text, count=1)

    # If no tags were found, add them at the bottom
    if count == 0:
        result = f"{text}\n\n[]({identifier})\n{new_text}\n[]({identifier.replace('-start', '')}-end)"

    return result
bitbucket
BitbucketProvider(repository: Repository, pull_request: PullRequest)

Bases: Provider

Bitbucket provider for delivering PR descriptions to Bitbucket Cloud API.

Source code in packages/lampe-cli/src/lampe/cli/providers/bitbucket.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(self, repository: Repository, pull_request: PullRequest) -> None:
    if pull_request.number == 0:
        # Try Bitbucket Pipelines environment variable first, then fallback to PR_NUMBER
        pr_number = os.getenv("BITBUCKET_PR_ID") or os.getenv("PR_NUMBER")
        if not pr_number:
            raise ValueError("BITBUCKET_PR_ID or PR_NUMBER environment variable is required for Bitbucket provider")
        pull_request.number = int(pr_number)

    super().__init__(repository, pull_request)

    # Extract workspace and repository from environment variables
    self.workspace = os.getenv("BITBUCKET_WORKSPACE")
    self.repo_slug = os.getenv("BITBUCKET_REPO_SLUG")

    if not self.workspace or not self.repo_slug:
        raise ValueError(
            "BITBUCKET_WORKSPACE and BITBUCKET_REPO_SLUG environment variables are required for Bitbucket provider"
        )

    # Initialize Bitbucket client with appropriate authentication
    self.base_url, self.auth_headers = self._initialize_bitbucket_client()
deliver_pr_description(payload: PRDescriptionPayload) -> None

Update the PR description on Bitbucket.

Source code in packages/lampe-cli/src/lampe/cli/providers/bitbucket.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def deliver_pr_description(self, payload: PRDescriptionPayload) -> None:
    """Update the PR description on Bitbucket."""
    if self.pull_request.number == 0:
        raise ValueError("Cannot update Bitbucket PR description for local run")

    try:
        # Get current PR details
        pr_url = (
            f"{self.base_url}/2.0/repositories/{self.workspace}/"
            f"{self.repo_slug}/pullrequests/{self.pull_request.number}"
        )

        # Fetch current PR to get existing description
        response = requests.get(pr_url, headers=self.auth_headers)
        response.raise_for_status()
        pr_data = response.json()

        # Update description with new content
        current_description = pr_data.get("description", "") or ""
        new_description = update_or_add_text_between_tags(
            current_description, payload.description_with_title, "description"
        )

        # Update the PR
        update_data = {"description": new_description}
        update_response = requests.put(pr_url, json=update_data, headers=self.auth_headers)
        update_response.raise_for_status()

        logger.info(f"✅ Successfully updated PR #{self.pull_request.number} description on Bitbucket")
    except requests.exceptions.RequestException as e:
        logger.error(f"❌ Failed to update Bitbucket PR: {e}")
        # Fallback to console output
        logger.info("Description:")
        logger.info(payload.description)
    except Exception as e:
        logger.error(f"❌ Unexpected error updating Bitbucket PR: {e}")
        # Fallback to console output
        logger.info("Description:")
        logger.info(payload.description)
deliver_pr_review(payload: PRReviewPayload) -> None

Post PR review comments on Bitbucket.

Source code in packages/lampe-cli/src/lampe/cli/providers/bitbucket.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
def deliver_pr_review(self, payload: PRReviewPayload) -> None:
    """Post PR review comments on Bitbucket."""
    if self.pull_request.number == 0:
        raise ValueError("Cannot post Bitbucket PR review for local run")

    try:
        # Post review comments for each agent review
        for agent_review in payload.reviews:
            # Post agent summary comment
            if agent_review.summary:
                try:
                    comment_url = (
                        f"{self.base_url}/2.0/repositories/{self.workspace}/"
                        f"{self.repo_slug}/pullrequests/{self.pull_request.number}/comments"
                    )
                    comment_data = {
                        "content": {
                            "raw": f"## {agent_review.agent_name}\n\n"
                            f"**Focus Areas:** {', '.join(agent_review.focus_areas)}\n\n"
                            f"{agent_review.summary}"
                        }
                    }
                    response = requests.post(comment_url, json=comment_data, headers=self.auth_headers)
                    response.raise_for_status()
                except Exception as e:
                    logger.warning(f"Failed to post agent summary for {agent_review.agent_name}: {e}")

            # Post file-specific comments
            for file_review in agent_review.reviews:
                if file_review.line_comments:
                    # Create review comments for specific lines
                    for line, comment in file_review.line_comments.items():
                        try:
                            line_number = int(line)
                        except ValueError:
                            match = re.match(r"\D*(\d+)", str(line))
                            if match:
                                line_number = int(match.group(1))
                            else:
                                line_number = 0
                        try:
                            # Post a comment on the PR
                            comment_url = (
                                f"{self.base_url}/2.0/repositories/{self.workspace}/"
                                f"{self.repo_slug}/pullrequests/{self.pull_request.number}/comments"
                            )
                            comment_data = {
                                "content": {"raw": f"## 🔦🐛\n{comment}"},
                                "inline": {
                                    "from": line_number - 1 if line_number != 0 else 0,
                                    "to": line_number,
                                    "start_from": line_number - 1 if line_number != 0 else 0,
                                    "start_to": line_number,
                                    "path": file_review.file_path,
                                },
                            }
                            response = requests.post(comment_url, json=comment_data, headers=self.auth_headers)
                            response.raise_for_status()
                        except Exception as e:
                            logger.warning(f"Failed to post comment for {file_review.file_path}:{line}: {e}")

                # Post structured comments (e.g. from agentic review) as inline comments
                if file_review.structured_comments:
                    comment_url = (
                        f"{self.base_url}/2.0/repositories/{self.workspace}/"
                        f"{self.repo_slug}/pullrequests/{self.pull_request.number}/comments"
                    )
                    for sc in file_review.structured_comments:
                        if getattr(sc, "muted", False):
                            continue
                        line_number = sc.line_number
                        try:
                            comment_data = {
                                "content": {"raw": f"## 🔦🐛 [{sc.severity}] {sc.comment}"},
                                "inline": {
                                    "from": line_number - 1 if line_number != 0 else 0,
                                    "to": line_number,
                                    "start_from": line_number - 1 if line_number != 0 else 0,
                                    "start_to": line_number,
                                    "path": file_review.file_path,
                                },
                            }
                            response = requests.post(comment_url, json=comment_data, headers=self.auth_headers)
                            response.raise_for_status()
                        except Exception as e:
                            logger.warning(f"Failed to post comment for {file_review.file_path}:{line_number}: {e}")

                # Post file summary comment if no inline comments were posted
                has_inline = bool(file_review.line_comments) or any(
                    not getattr(sc, "muted", False) for sc in file_review.structured_comments
                )
                if not has_inline and file_review.summary:
                    try:
                        comment_url = (
                            f"{self.base_url}/2.0/repositories/{self.workspace}/"
                            f"{self.repo_slug}/pullrequests/{self.pull_request.number}/comments"
                        )
                        comment_data = {"content": {"raw": f"**{file_review.file_path}:** {file_review.summary}"}}
                        response = requests.post(comment_url, json=comment_data, headers=self.auth_headers)
                        response.raise_for_status()
                    except Exception as e:
                        logger.warning(f"Failed to post summary for {file_review.file_path}: {e}")

        logger.info(f"✅ Successfully posted PR #{self.pull_request.number} review comments on Bitbucket")
    except requests.exceptions.RequestException as e:
        logger.error(f"❌ Failed to post Bitbucket PR review: {e}")
        # Fallback to console output
        logger.info("Review:")
        logger.info(payload.review_markdown)
    except Exception as e:
        logger.error(f"❌ Unexpected error posting Bitbucket PR review: {e}")
        # Fallback to console output
        logger.info("Review:")
        logger.info(payload.review_markdown)
has_reviewed() -> bool

Check if the token user has already reviewed this PR.

Source code in packages/lampe-cli/src/lampe/cli/providers/bitbucket.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def has_reviewed(self) -> bool:
    """Check if the token user has already reviewed this PR."""
    if self.pull_request.number == 0:
        return False

    try:
        # Get PR comments
        comments_url = (
            f"{self.base_url}/2.0/repositories/{self.workspace}/"
            f"{self.repo_slug}/pullrequests/{self.pull_request.number}/comments"
        )
        comments_response = requests.get(comments_url, headers=self.auth_headers)
        comments_response.raise_for_status()
        comments_data = comments_response.json()

        # Try to get the current authenticated user (token owner)
        # This works for user tokens but may fail for repository/workspace tokens
        token_user_uuid = None
        token_username = None
        try:
            user_info_response = requests.get(f"{self.base_url}/2.0/user", headers=self.auth_headers)
            user_info_response.raise_for_status()
            user_info = user_info_response.json()
            token_user_uuid = user_info.get("uuid") or user_info.get("account_id")
            if not token_user_uuid:
                token_username = user_info.get("username") or user_info.get("nickname")
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 401:
                # Repository/workspace tokens can't access /2.0/user
                # Fall back to pattern-based detection
                logger.debug("Token doesn't have access to /2.0/user endpoint, using pattern-based detection")
            else:
                raise

        # Check for comments by the token user (if we have user identity)
        if token_user_uuid or token_username:
            for comment in comments_data.get("values", []):
                user = comment.get("user", {})
                if token_user_uuid:
                    if user.get("uuid") == token_user_uuid or user.get("account_id") == token_user_uuid:
                        return True
                elif token_username:
                    if user.get("username") == token_username or user.get("nickname") == token_username:
                        return True

        # Fallback: Check for review comments by pattern (for repository/workspace tokens)
        # Look for comments that match Lampe review format:
        # - Comments starting with "## " (agent name headers)
        # - Comments containing "Focus Areas:"
        # - Comments containing "🔦🐛" (line comment marker)
        review_patterns = [
            r"^##\s+\w+",  # Agent name header (e.g., "## SecurityAgent")
            r"\*\*Focus Areas:\*\*",  # Focus areas marker
            r"##\s*🔦🐛",  # Line comment marker
        ]

        for comment in comments_data.get("values", []):
            content = comment.get("content", {}).get("raw", "") or comment.get("content", {}).get("markup", "")
            if content:
                for pattern in review_patterns:
                    if re.search(pattern, content, re.IGNORECASE | re.MULTILINE):
                        return True

        return False
    except requests.exceptions.RequestException as e:
        logger.warning(f"Failed to check if PR has been reviewed: {e}")
        return False
    except Exception as e:
        logger.warning(f"Unexpected error checking if PR has been reviewed: {e}")
        return False
healthcheck() -> None

Check if the Bitbucket provider is healthy and can connect to Bitbucket.

Source code in packages/lampe-cli/src/lampe/cli/providers/bitbucket.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def healthcheck(self) -> None:
    """Check if the Bitbucket provider is healthy and can connect to Bitbucket."""
    logger.info("🔍 Checking Bitbucket provider health...")

    # Check Bitbucket environment variables
    workspace = os.getenv("BITBUCKET_WORKSPACE")
    repo_slug = os.getenv("BITBUCKET_REPO_SLUG")

    if not workspace or not repo_slug:
        logger.info("❌ Bitbucket environment variables not set")
        logger.info("   Set both:")
        logger.info("   - BITBUCKET_WORKSPACE (e.g., 'my-workspace')")
        logger.info("   - BITBUCKET_REPO_SLUG (e.g., 'my-repo')")
        raise ValueError("BITBUCKET_WORKSPACE and BITBUCKET_REPO_SLUG environment variables are required")

    logger.info(f"✅ BITBUCKET_WORKSPACE set to: {workspace}")
    logger.info(f"✅ BITBUCKET_REPO_SLUG set to: {repo_slug}")

    # Check authentication environment variables
    token = os.getenv("LAMPE_BITBUCKET_TOKEN")
    app_key = os.getenv("LAMPE_BITBUCKET_APP_KEY")
    app_secret = os.getenv("LAMPE_BITBUCKET_APP_SECRET")

    auth_method = None
    if token:
        auth_method = "Token"
        logger.info("✅ Bitbucket token authentication detected")
    elif app_key and app_secret:
        auth_method = "App"
        logger.info("✅ Bitbucket App authentication detected")
    else:
        logger.info("❌ No Bitbucket authentication found")
        logger.info("   Set either:")
        logger.info("   - LAMPE_BITBUCKET_TOKEN for token authentication")
        logger.info("   - LAMPE_BITBUCKET_APP_KEY and LAMPE_BITBUCKET_APP_SECRET for app authentication")
        raise ValueError("No Bitbucket authentication found")

    # Test Bitbucket connection
    try:
        # Test API access by getting repository info
        repo_url = f"{self.base_url}/2.0/repositories/{workspace}/{repo_slug}"
        response = requests.get(repo_url, headers=self.auth_headers)
        response.raise_for_status()
        repo_data = response.json()

        logger.info(f"✅ Repository access confirmed: {repo_data.get('full_name', f'{workspace}/{repo_slug}')}")
        logger.info(f"   Description: {repo_data.get('description') or 'No description'}")
        logger.info(f"   Private: {repo_data.get('is_private', 'Unknown')}")
        logger.info(f"✅ Bitbucket {auth_method} authentication successful")

    except requests.exceptions.RequestException as e:
        logger.info(f"❌ Bitbucket connection failed: {e}")
        logger.info("\nTroubleshooting tips:")
        if auth_method == "Token":
            logger.info("- Verify LAMPE_BITBUCKET_TOKEN is valid and has appropriate permissions")
            logger.info("- Ensure the token has 'repositories:read' scope")
        else:
            logger.info("- Verify LAMPE_BITBUCKET_APP_KEY and LAMPE_BITBUCKET_APP_SECRET are correct")
            logger.info("- Ensure the Bitbucket App is installed on the workspace")
        raise
    except Exception as e:
        logger.info(f"❌ Unexpected error during Bitbucket healthcheck: {e}")
        raise
console
ConsoleProvider(repository: Repository, pull_request: PullRequest)

Bases: Provider

Console provider for delivering PR descriptions to stdout.

Source code in packages/lampe-cli/src/lampe/cli/providers/console.py
16
17
def __init__(self, repository: Repository, pull_request: PullRequest) -> None:
    super().__init__(repository, pull_request)
deliver_pr_description(payload: PRDescriptionPayload) -> None

Print the PR description to console.

Source code in packages/lampe-cli/src/lampe/cli/providers/console.py
19
20
21
def deliver_pr_description(self, payload: PRDescriptionPayload) -> None:
    """Print the PR description to console."""
    print(payload.description)
deliver_pr_review(payload: PRReviewPayload) -> None

Print the PR review to console.

Source code in packages/lampe-cli/src/lampe/cli/providers/console.py
23
24
25
def deliver_pr_review(self, payload: PRReviewPayload) -> None:
    """Print the PR review to console."""
    print(payload.review_markdown)
has_reviewed() -> bool

Check if the token user has already reviewed this PR.

Source code in packages/lampe-cli/src/lampe/cli/providers/console.py
31
32
33
34
def has_reviewed(self) -> bool:
    """Check if the token user has already reviewed this PR."""
    # Console provider cannot check for existing reviews
    return False
healthcheck() -> None

Check if the console provider is healthy and can connect to the service.

Source code in packages/lampe-cli/src/lampe/cli/providers/console.py
27
28
29
def healthcheck(self) -> None:
    """Check if the console provider is healthy and can connect to the service."""
    logger.info("✅ Console provider is healthy")
github
GitHubProvider(repository: Repository, pull_request: PullRequest)

Bases: Provider

GitHub provider for delivering PR descriptions to GitHub API.

Source code in packages/lampe-cli/src/lampe/cli/providers/github.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def __init__(self, repository: Repository, pull_request: PullRequest) -> None:
    if pull_request.number == 0:
        pr_number = os.getenv("PR_NUMBER")
        if not pr_number:
            raise ValueError("PR_NUMBER environment variable is required for GitHub provider")
        pull_request.number = int(pr_number)

    super().__init__(repository, pull_request)

    # github action has many default environment variables, including the repository full name:
    # https://docs.github.com/en/actions/reference/workflows-and-actions/variables#default-environment-variables
    if repo_name := os.getenv("GITHUB_REPOSITORY"):
        self.owner, self.repo_name = repo_name.split("/")
    else:
        raise ValueError("GITHUB_REPOSITORY environment variable is required for GitHub provider")

    # Initialize GitHub client with appropriate authentication
    self.github_client = self._initialize_github_client()
deliver_pr_description(payload: PRDescriptionPayload) -> None

Update the PR description on GitHub.

Source code in packages/lampe-cli/src/lampe/cli/providers/github.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def deliver_pr_description(self, payload: PRDescriptionPayload) -> None:
    """Update the PR description on GitHub."""
    if self.pull_request.number == 0:
        raise ValueError("Cannot update GitHub PR description for local run")

    try:
        repo = self.github_client.get_repo(f"{self.owner}/{self.repo_name}")
        pull_request = repo.get_pull(self.pull_request.number)
        new_description = update_or_add_text_between_tags(
            pull_request.body or "", payload.description_with_title, "description"
        )
        pull_request.edit(body=new_description)
        logger.info(f"✅ Successfully updated PR #{self.pull_request.number} description on GitHub")
    except Exception as e:
        logger.info(f"❌ Failed to update GitHub PR: {e}")
        # Fallback to console output
        logger.info("Description:")
        logger.info(payload.description)
deliver_pr_review(payload: PRReviewPayload) -> None

Post PR review comments on GitHub.

Source code in packages/lampe-cli/src/lampe/cli/providers/github.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def deliver_pr_review(self, payload: PRReviewPayload) -> None:
    """Post PR review comments on GitHub."""
    if self.pull_request.number == 0:
        raise ValueError("Cannot post GitHub PR review for local run")

    try:
        repo = self.github_client.get_repo(f"{self.owner}/{self.repo_name}")
        pull_request = repo.get_pull(self.pull_request.number)

        # Post review comments for each agent review
        for agent_review in payload.reviews:
            # Post agent summary comment
            if agent_review.summary:
                try:
                    pull_request.create_issue_comment(
                        f"## {agent_review.agent_name}\n\n"
                        f"**Focus Areas:** {', '.join(agent_review.focus_areas)}\n\n"
                        f"{agent_review.summary}"
                    )
                except Exception as e:
                    logger.warning(f"Failed to post agent summary for {agent_review.agent_name}: {e}")

            # Post file-specific comments
            for file_review in agent_review.reviews:
                if file_review.line_comments:
                    # Create review comments for specific lines
                    for line, comment in file_review.line_comments.items():
                        try:
                            # Post a review comment
                            pull_request.create_review_comment(
                                body=f"## 🔦🐛\n{comment}",
                                commit=pull_request.head.sha,
                                path=file_review.file_path,
                                line=int(line),
                            )
                        except Exception as e:
                            logger.warning(f"Failed to post comment for {file_review.file_path}:{line}: {e}")
                            # Fallback: post as general comment
                            pull_request.create_issue_comment(
                                f"**{file_review.file_path} (Line {line}):** {comment}"
                            )

                # Post structured comments (e.g. from agentic review) as inline comments
                if file_review.structured_comments:
                    for sc in file_review.structured_comments:
                        if getattr(sc, "muted", False):
                            continue
                        badges = _format_structured_comment_badges(sc.severity, sc.category)
                        comment_body = f"## 🔦🐛\n\n{badges}\n\n{sc.comment}"
                        try:
                            pull_request.create_review_comment(
                                body=comment_body,
                                commit=pull_request.head.sha,
                                path=file_review.file_path,
                                line=sc.line_number,
                            )
                        except Exception as e:
                            logger.warning(
                                f"Failed to post comment for {file_review.file_path}:{sc.line_number}: {e}"
                            )
                            pull_request.create_issue_comment(
                                f"**{file_review.file_path} (Line {sc.line_number}):**\n\n" f"{comment_body}"
                            )

                # Post summary comment if no inline comments were posted
                has_inline = bool(file_review.line_comments) or any(
                    not getattr(sc, "muted", False) for sc in file_review.structured_comments
                )
                if not has_inline and file_review.summary:
                    pull_request.create_issue_comment(f"**{file_review.file_path}:** {file_review.summary}")

        logger.info(f"✅ Successfully posted PR #{self.pull_request.number} review comments on GitHub")
    except Exception as e:
        logger.info(f"❌ Failed to post GitHub PR review: {e}")
        # Fallback to console output
        logger.info("Review:")
        logger.info(payload.review_markdown)
has_reviewed() -> bool

Check if the token user has already reviewed this PR.

Source code in packages/lampe-cli/src/lampe/cli/providers/github.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def has_reviewed(self) -> bool:
    """Check if the token user has already reviewed this PR."""
    if self.pull_request.number == 0:
        return False

    try:
        repo = self.github_client.get_repo(f"{self.owner}/{self.repo_name}")
        pull_request = repo.get_pull(self.pull_request.number)

        # Get the authenticated user
        authenticated_user = self.github_client.get_user()

        # Check issue comments (where reviews are posted)
        comments = pull_request.get_issue_comments()
        for comment in comments:
            if comment.user.login == authenticated_user.login:
                return True

        # Also check review comments (inline comments)
        review_comments = pull_request.get_review_comments()
        for comment in review_comments:
            if comment.user.login == authenticated_user.login:
                return True

        return False
    except Exception as e:
        logger.warning(f"Failed to check if PR has been reviewed: {e}")
        return False
healthcheck() -> None

Check if the GitHub provider is healthy and can connect to GitHub.

Source code in packages/lampe-cli/src/lampe/cli/providers/github.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def healthcheck(self) -> None:
    """Check if the GitHub provider is healthy and can connect to GitHub."""
    logger.info("🔍 Checking GitHub provider health...")

    # Check GitHub repository environment variable
    github_repo = os.getenv("GITHUB_REPOSITORY")
    if not github_repo or len(github_repo.split("/")) != 2:
        logger.info("❌ GITHUB_REPOSITORY environment variable not set")
        logger.info("   Set it to 'owner/repo' format (e.g., 'montagne-dev/lampe')")
        raise ValueError("GITHUB_REPOSITORY environment variable not set")
    logger.info(f"✅ GITHUB_REPOSITORY set to: {github_repo}")

    # Check authentication environment variables
    app_id = os.getenv("LAMPE_GITHUB_APP_ID")
    private_key = os.getenv("LAMPE_GITHUB_APP_PRIVATE_KEY")
    token = os.getenv("LAMPE_GITHUB_TOKEN")

    auth_method = None
    if app_id and private_key:
        auth_method = "GitHub App"
        logger.info(f"✅ GitHub App authentication detected (App ID: {app_id})")
    elif token:
        auth_method = "User Token"
        logger.info("✅ User token authentication detected")
    else:
        logger.info("❌ No GitHub authentication found")
        logger.info("   Set either:")
        logger.info("   - LAMPE_GITHUB_APP_ID and LAMPE_GITHUB_APP_PRIVATE_KEY for GitHub App")
        logger.info("   - LAMPE_GITHUB_TOKEN for user token authentication")
        raise ValueError("No GitHub authentication found")

    # Test GitHub connection
    try:
        # Test API access by getting repository info
        repo_info = self.github_client.get_repo(github_repo)
        logger.info(f"✅ Repository access confirmed: {repo_info.full_name}")
        logger.info(f"   Description: {repo_info.description or 'No description'}")
        logger.info(f"   Private: {repo_info.private}")
        logger.info(f"✅ GitHub {auth_method} authentication successful")

    except Exception as e:
        logger.info(f"❌ GitHub connection failed: {e}")
        logger.info("\nTroubleshooting tips:")
        if auth_method == "GitHub App":
            logger.info("- Verify LAMPE_GITHUB_APP_ID and LAMPE_GITHUB_APP_PRIVATE_KEY are correct")
            logger.info("- Ensure the GitHub App is installed on the repository")
            logger.info("- Check that the private key is properly formatted")
        else:
            logger.info("- Verify LAMPE_GITHUB_TOKEN is valid and has appropriate permissions")
            logger.info("- Ensure the token has 'repo' scope for private repositories")
        raise

core

data_models

Issue

Bases: BaseModel

Individual issue to be resolved.

PullRequest

Bases: BaseModel

Pull request information.

Repository

Bases: BaseModel

Repository information.

issue
Issue

Bases: BaseModel

Individual issue to be resolved.

pull_request
PullRequest

Bases: BaseModel

Pull request information.

repository
Repository

Bases: BaseModel

Repository information.

gitconfig

init_git()

Initialize Git configuration and check version requirements.

Source code in src/lampe/core/gitconfig.py
58
59
60
61
def init_git():
    """Initialize Git configuration and check version requirements."""
    logger.debug("Initializing Git configuration...")
    valid_git_version_available()
valid_git_version_available() -> bool

Check if the installed Git version meets the minimum requirement.

Returns:

Type Description
bool

True if Git version meets requirement, False otherwise

Source code in src/lampe/core/gitconfig.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def valid_git_version_available() -> bool:
    """
    Check if the installed Git version meets the minimum requirement.

    Returns
    -------
    :
        True if Git version meets requirement, False otherwise
    """
    try:
        version_line = git.Git().version().strip()
        if not version_line:
            logger.critical("Unable to determine Git version from output.")
            return False

        # Extract version number from output like "git version 2.39.0"
        version_parts = version_line.split()
        if len(version_parts) < 3:
            logger.critical(f"Unexpected Git version output format: {version_line}")
            return False

        current_version = version_parts[2]

        # Handle version strings with additional info (e.g., "2.39.0.windows.1")
        # Take only the semantic version part
        current_version = current_version.split(".")[0:3]
        current_version = ".".join(current_version)

        if version.parse(current_version) >= version.parse(MINIMUM_GIT_VERSION):
            logger.debug(f"Git version {current_version} meets requirement ({MINIMUM_GIT_VERSION}+)")
            return True
        else:
            logger.critical(
                f"CRITICAL: Git version {current_version} does not meet the minimum requirement "
                f"({MINIMUM_GIT_VERSION}+). The lampe-sdk requires Git {MINIMUM_GIT_VERSION} or higher "
                f"for proper functionality. Git operations may fail or behave unexpectedly. "
                f"Please upgrade your Git installation. See the README for installation instructions."
            )
            return False
    except Exception as e:
        logger.critical(f"Unexpected error while checking Git version: {e}")
        return False

langfuseconfig

trace_span(func: Callable) -> Callable

Decorator that add any non auto-instrumented function to the langfuse trace.

Source code in src/lampe/core/langfuseconfig.py
67
68
69
70
71
72
73
74
75
76
77
def trace_span(func: Callable) -> Callable:
    """Decorator that add any non auto-instrumented function to the langfuse trace."""

    @wraps(func)
    def wrapper(*args, **kwargs):
        if dispatcher:
            return dispatcher.span(func)(*args, **kwargs)
        else:
            return func(*args, **kwargs)

    return wrapper
trace_with_function_name(func: Callable) -> Callable

Decorator that automatically updates the current trace with function name and metadata.

The decorated function should have a 'metadata' parameter to pass additional metadata to the trace.

Source code in src/lampe/core/langfuseconfig.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def trace_with_function_name(func: Callable) -> Callable:
    """
    Decorator that automatically updates the current trace with function name and metadata.

    The decorated function should have a 'metadata' parameter to pass additional metadata
    to the trace.
    """

    @wraps(func)
    async def async_wrapper(*args, **kwargs) -> Any:
        # Extract metadata from kwargs, defaulting to empty dict if not provided
        metadata = kwargs.get("metadata", {})

        # Create tags list with function name
        tags = [func.__name__]

        # Update current trace with metadata and tags
        update_current_trace(metadata=metadata, tags=tags)

        # Call the original function
        return await func(*args, **kwargs)

    @wraps(func)
    def sync_wrapper(*args, **kwargs) -> Any:
        # Extract metadata from kwargs, defaulting to empty dict if not provided
        metadata = kwargs.get("metadata", {})

        # Create tags list with function name
        tags = [func.__name__]

        # Update current trace with metadata and tags
        update_current_trace(metadata=metadata, tags=tags)

        # Call the original function
        return func(*args, **kwargs)

    # Return appropriate wrapper based on whether function is async
    import inspect

    if inspect.iscoroutinefunction(func):
        return async_wrapper
    else:
        return sync_wrapper

llmconfig

get_model(env_var: str, default: str) -> str

Return model from env var or default. For use with LAMPE_MODEL_* variables.

Source code in src/lampe/core/llmconfig.py
18
19
20
def get_model(env_var: str, default: str) -> str:
    """Return model from env var or default. For use with LAMPE_MODEL_* variables."""
    return os.getenv(env_var) or default
provider_from_model(model: str) -> str | None

Extract provider from LiteLLM model string (e.g. anthropic/claude-..., openai/gpt-...).

Source code in src/lampe/core/llmconfig.py
23
24
25
26
27
28
29
30
def provider_from_model(model: str) -> str | None:
    """Extract provider from LiteLLM model string (e.g. anthropic/claude-..., openai/gpt-...)."""
    m = model.lower()
    if "anthropic" in m:
        return "anthropic"
    if "openai" in m:
        return "openai"
    return None

parsers

MarkdownCodeBlockRemoverOutputParser

Bases: BaseOutputParser

Output parser that extracts and returns the content of markdown code blocks marked with 'md' or 'markdown'.

This parser is designed to process LLM outputs or other text that may contain markdown code blocks.
It specifically targets code blocks with the language tag 'md' or 'markdown', removing the code block
markers and returning only the inner content. If no such block is found, it falls back to extracting
a generic code block (```). If the result still contains any other code block (with a language tag),
it is preserved as-is. If no code block is found, the original text (stripped of leading/trailing whitespace)
is returned.
Edge Cases:
- If the input is an empty string, returns an empty string.
- If the input contains a code block with a language other than 'md' or 'markdown', it is preserved.
- If the input contains text before or after a markdown code block, only the content inside the block is returned.
- If the input contains an incomplete code block, returns the input with the trailing backticks removed if present.
Examples
>>> parser = MarkdownCodeBlockRemoverOutputParser()
>>> text = '''```md
... This is inside md block.
... ```'''
>>> parser.parse(text)
'This is inside md block.'

>>> text = '''```python
... Multiple lines
... are here.
... ```'''
>>> parser.parse(text)
'```python

Multiple lines are here. ```'

>>> text = 'No code block here.'
>>> parser.parse(text)
'No code block here.'
parse(output: str) -> str

Extracts and returns the content of a markdown code block marked with md ormarkdown from the input text.

If the input contains a markdown code block with language tag 'md' or 'markdown', the content inside that block is returned, with the code block markers removed. If no such block is found, but a generic code block (```) is present, its content is returned. If the result still contains any other code block (with a language tag), it is preserved as-is. If no code block is found, the original text (stripped of leading/trailing whitespace) is returned.

Source code in src/lampe/core/parsers/markdown_code_block_remover_output.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def parse(self, output: str) -> str:
    """
    Extracts and returns the content of a markdown code block marked with ```md or ```markdown from the input text.

    If the input contains a markdown code block with language tag 'md' or 'markdown',
    the content inside that block is returned, with the code block markers removed.
    If no such block is found, but a generic code block (```) is present, its content is returned.
    If the result still contains any other code block (with a language tag), it is preserved as-is.
    If no code block is found, the original text (stripped of leading/trailing whitespace) is returned.
    """
    if output == "":
        return output
    # Try to extract content from markdown code blocks with specific languages
    content = (
        extract_md_code_block(output, "md")
        or extract_md_code_block(output, "markdown")
        or extract_md_code_block(output, "")
    ) or output.strip()

    if extract_md_code_block(content, match_any_language=True) is not None:
        # if there is any other remaining code block, we don't want to remove triple backticks
        return content

    if content.startswith("```"):
        content = content[3:]
    if content.endswith("```"):
        content = content[:-3]
    return content
YAMLPydanticOutputParser

Bases: PydanticOutputParser[Model], Generic[Model]

A parser that extracts and validates YAML content using Pydantic models.

Parameters:

Name Type Description Default
output_cls

Pydantic output class used for validation

required
excluded_schema_keys_from_format

Schema keys to exclude from format string, by default None

required
pydantic_format_tmpl

Template for format string, by default PYDANTIC_FORMAT_TMPL

required
Notes

This parser extracts YAML content from markdown code blocks, validates the structure using a Pydantic model, and returns the validated data. It first looks for YAML-specific code blocks, then falls back to any code block if needed.

format_string: str property

Get the format string that instructs the LLM how to output YAML.

This method will provide a format string that includes the Pydantic model's JSON schema converted to a YAML example, helping the LLM understand the expected output structure.

Returns:

Type Description
str

Format string with YAML schema example

Raises:

Type Description
NotImplementedError

The method is not yet implemented

parse(text: str) -> Model

Extract, parse and validate YAML content using the configured Pydantic model.

Parameters:

Name Type Description Default
text str

Raw text containing YAML content in markdown code blocks

required

Returns:

Type Description
Model

Validated data matching the Pydantic model structure

Raises:

Type Description
YAMLParsingError

If no valid YAML content is found in the text or if the YAML parsing fails due to syntax errors

ValidationError

If the data does not match the Pydantic model schema

Source code in src/lampe/core/parsers/yaml_pydantic_output.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def parse(self, text: str) -> Model:
    """
    Extract, parse and validate YAML content using the configured Pydantic model.

    Parameters
    ----------
    text
        Raw text containing YAML content in markdown code blocks

    Returns
    -------
    :
        Validated data matching the Pydantic model structure

    Raises
    ------
    YAMLParsingError
        If no valid YAML content is found in the text or if the YAML parsing fails due to syntax errors
    ValidationError
        If the data does not match the Pydantic model schema
    """
    if not text:
        raise YAMLParsingError("No text provided")

    yaml_block = extract_md_code_block(text, "yaml")
    if not yaml_block:
        logger.warning("No YAML block found, attempting to parse generic code block")
        yaml_block = extract_md_code_block(text)
    if not yaml_block:
        yaml_block = text
    try:
        data = yaml.safe_load(yaml_block)
    except yaml.YAMLError as e:
        raise YAMLParsingError(f"Invalid YAML syntax: {e}") from e

    return self.output_cls.model_validate(data)
markdown_code_block_remover_output
MarkdownCodeBlockRemoverOutputParser

Bases: BaseOutputParser

Output parser that extracts and returns the content of markdown code blocks marked with 'md' or 'markdown'.

This parser is designed to process LLM outputs or other text that may contain markdown code blocks.
It specifically targets code blocks with the language tag 'md' or 'markdown', removing the code block
markers and returning only the inner content. If no such block is found, it falls back to extracting
a generic code block (```). If the result still contains any other code block (with a language tag),
it is preserved as-is. If no code block is found, the original text (stripped of leading/trailing whitespace)
is returned.
Edge Cases:
- If the input is an empty string, returns an empty string.
- If the input contains a code block with a language other than 'md' or 'markdown', it is preserved.
- If the input contains text before or after a markdown code block, only the content inside the block is returned.
- If the input contains an incomplete code block, returns the input with the trailing backticks removed if present.
Examples
>>> parser = MarkdownCodeBlockRemoverOutputParser()
>>> text = '''```md
... This is inside md block.
... ```'''
>>> parser.parse(text)
'This is inside md block.'

>>> text = '''```python
... Multiple lines
... are here.
... ```'''
>>> parser.parse(text)
'```python

Multiple lines are here. ```'

>>> text = 'No code block here.'
>>> parser.parse(text)
'No code block here.'
parse(output: str) -> str

Extracts and returns the content of a markdown code block marked with md ormarkdown from the input text.

If the input contains a markdown code block with language tag 'md' or 'markdown', the content inside that block is returned, with the code block markers removed. If no such block is found, but a generic code block (```) is present, its content is returned. If the result still contains any other code block (with a language tag), it is preserved as-is. If no code block is found, the original text (stripped of leading/trailing whitespace) is returned.

Source code in src/lampe/core/parsers/markdown_code_block_remover_output.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def parse(self, output: str) -> str:
    """
    Extracts and returns the content of a markdown code block marked with ```md or ```markdown from the input text.

    If the input contains a markdown code block with language tag 'md' or 'markdown',
    the content inside that block is returned, with the code block markers removed.
    If no such block is found, but a generic code block (```) is present, its content is returned.
    If the result still contains any other code block (with a language tag), it is preserved as-is.
    If no code block is found, the original text (stripped of leading/trailing whitespace) is returned.
    """
    if output == "":
        return output
    # Try to extract content from markdown code blocks with specific languages
    content = (
        extract_md_code_block(output, "md")
        or extract_md_code_block(output, "markdown")
        or extract_md_code_block(output, "")
    ) or output.strip()

    if extract_md_code_block(content, match_any_language=True) is not None:
        # if there is any other remaining code block, we don't want to remove triple backticks
        return content

    if content.startswith("```"):
        content = content[3:]
    if content.endswith("```"):
        content = content[:-3]
    return content
utils
extract_md_code_block(output: str, language: str = '', match_any_language: bool = False) -> str | None

Extract markdown code block content from a string, handling nested code blocks.

Parameters:

Name Type Description Default
output str

The string to extract code block content from.

required
language str

The language identifier for the code block (e.g., 'yaml', 'python', 'json').

''
match_any_language bool

If True, the language of the code block is optional and the function will return the first code block found.

False

Returns:

Type Description
str | None

The extracted code block content, or the entire input if no language is specified or no matching code block is found.

Notes

This function extracts content between {language} tags, preserving any nested code blocks within the content. The regex pattern handles: - Optional text before the code block - Nested code blocks (e.g.json, python, inside the main block) - Proper indentation of nested content - Case-insensitive language tag matching

Examples:

>>> text = '''
... Some text
... ```yaml
... key: value
... nested: |
...   ```python
...   print("Hello")
...   ```
... ```
... '''
>>> result = extract_md_code_block(text, 'yaml')
>>> print(result)
key: value
nested: |
  ```python
  print("Hello")
  ```
Source code in src/lampe/core/parsers/utils.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def extract_md_code_block(output: str, language: str = "", match_any_language: bool = False) -> str | None:
    """Extract markdown code block content from a string, handling nested code blocks.

    Parameters
    ----------
    output : str
        The string to extract code block content from.
    language : str
        The language identifier for the code block (e.g., 'yaml', 'python', 'json').
    match_any_language : bool
        If True, the language of the code block is optional and the function will return the first code block found.
    Returns
    -------
    :
        The extracted code block content, or the entire input if no language is specified
        or no matching code block is found.

    Notes
    -----
    This function extracts content between ```{language} tags, preserving any nested
    code blocks within the content. The regex pattern handles:
    - Optional text before the code block
    - Nested code blocks (e.g. ```json, ```python, ``` inside the main block)
    - Proper indentation of nested content
    - Case-insensitive language tag matching

    Examples
    --------
    >>> text = '''
    ... Some text
    ... ```yaml
    ... key: value
    ... nested: |
    ...   ```python
    ...   print("Hello")
    ...   ```
    ... ```
    ... '''
    >>> result = extract_md_code_block(text, 'yaml')
    >>> print(result)
    key: value
    nested: |
      ```python
      print("Hello")
      ```
    """

    if match_any_language:
        code_block_pattern = MARKDOWN_CODE_BLOCK_PATTERN.format(language=MARKDOWN_CODE_BLOCK_MATCH_ANY_LANGUAGE_PATTERN)
    else:
        code_block_pattern = MARKDOWN_CODE_BLOCK_PATTERN.format(language=language)

    result = re.search(code_block_pattern, output, re.MULTILINE | re.IGNORECASE | re.DOTALL)
    if result:
        return result.group(1)
    return None
yaml_pydantic_output
YAMLParsingError

Bases: Exception

Raised when YAML parsing or validation fails.

YAMLPydanticOutputParser

Bases: PydanticOutputParser[Model], Generic[Model]

A parser that extracts and validates YAML content using Pydantic models.

Parameters:

Name Type Description Default
output_cls

Pydantic output class used for validation

required
excluded_schema_keys_from_format

Schema keys to exclude from format string, by default None

required
pydantic_format_tmpl

Template for format string, by default PYDANTIC_FORMAT_TMPL

required
Notes

This parser extracts YAML content from markdown code blocks, validates the structure using a Pydantic model, and returns the validated data. It first looks for YAML-specific code blocks, then falls back to any code block if needed.

format_string: str property

Get the format string that instructs the LLM how to output YAML.

This method will provide a format string that includes the Pydantic model's JSON schema converted to a YAML example, helping the LLM understand the expected output structure.

Returns:

Type Description
str

Format string with YAML schema example

Raises:

Type Description
NotImplementedError

The method is not yet implemented

parse(text: str) -> Model

Extract, parse and validate YAML content using the configured Pydantic model.

Parameters:

Name Type Description Default
text str

Raw text containing YAML content in markdown code blocks

required

Returns:

Type Description
Model

Validated data matching the Pydantic model structure

Raises:

Type Description
YAMLParsingError

If no valid YAML content is found in the text or if the YAML parsing fails due to syntax errors

ValidationError

If the data does not match the Pydantic model schema

Source code in src/lampe/core/parsers/yaml_pydantic_output.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def parse(self, text: str) -> Model:
    """
    Extract, parse and validate YAML content using the configured Pydantic model.

    Parameters
    ----------
    text
        Raw text containing YAML content in markdown code blocks

    Returns
    -------
    :
        Validated data matching the Pydantic model structure

    Raises
    ------
    YAMLParsingError
        If no valid YAML content is found in the text or if the YAML parsing fails due to syntax errors
    ValidationError
        If the data does not match the Pydantic model schema
    """
    if not text:
        raise YAMLParsingError("No text provided")

    yaml_block = extract_md_code_block(text, "yaml")
    if not yaml_block:
        logger.warning("No YAML block found, attempting to parse generic code block")
        yaml_block = extract_md_code_block(text)
    if not yaml_block:
        yaml_block = text
    try:
        data = yaml.safe_load(yaml_block)
    except yaml.YAMLError as e:
        raise YAMLParsingError(f"Invalid YAML syntax: {e}") from e

    return self.output_cls.model_validate(data)

tools

TempGitRepository(repo_url: str, head_ref: str | None = None, base_ref: str | None = None, folder_name: str | None = None, sparse: bool = True, shallow: bool = True, blob_filter: bool = True, remove_existing: bool = True)

Context Manager for cloning and cleaning up a local clone of a repository

Uses partial clone optimizations including shallow clone, sparse checkout, and blob filtering to efficiently fetch only required content. Upon exit, will attempt to delete the cloned repository.

Attributes:

Name Type Description
repo_url

Repository URL to clone

head_ref

Optional head ref to check out.

folder_name

Optional name prefix for temp directory

sparse

Enable sparse checkout mode to avoid populating all files initially.

shallow

Enable shallow clone (depth=1) to fetch only the target commit.

blob_filter

Enable blob filtering (--filter=blob:none) to fetch file contents on-demand

remove_existing

Remove existing directory if it exists

Raises:

Type Description
RuntimeError

If Git version check fails

GitCommandError

If clone operation fails

UnableToDeleteError

If unable to delete the cloned repository

Source code in src/lampe/core/tools/repository/management.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def __init__(
    self,
    repo_url: str,
    head_ref: str | None = None,
    base_ref: str | None = None,
    folder_name: str | None = None,
    sparse: bool = True,
    shallow: bool = True,
    blob_filter: bool = True,
    remove_existing: bool = True,
):
    self.repo_url = repo_url
    self.head_ref = head_ref
    self.base_ref = base_ref
    self.folder_name = folder_name
    self.sparse = sparse
    self.shallow = shallow
    self.blob_filter = blob_filter
    self.remove_existing = remove_existing
    self.path_to_local_repo = None
clone_repo(repo_url: str, head_ref: str | None = None, base_ref: str | None = None, folder_name: str | None = None, sparse: bool = True, shallow: bool = True, blob_filter: bool = True, remove_existing: bool = True) -> str

Clone a repository optimized for PR review.

Uses partial clone optimizations including shallow clone, sparse checkout, and blob filtering to efficiently fetch only required content.

Parameters:

Name Type Description Default
repo_url str

Repository URL to clone

required
head_ref str | None

Head ref to checkout

None
base_ref str | None

Base ref to fetch for diff computation

None
folder_name str | None

Optional name prefix for temp directory

None
sparse bool

Enable sparse checkout mode to avoid populating all files initially

True
shallow bool

Enable shallow clone (depth=1) to fetch only the target commit

True
blob_filter bool

Enable blob filtering (--filter=blob:none) to fetch file contents on-demand

True
remove_existing bool

Remove existing directory if it exists

True

Returns:

Type Description
str

Path to the cloned repository

Raises:

Type Description
RuntimeError

If Git version check fails

GitCommandError

If clone operation fails

Source code in src/lampe/core/tools/repository/management.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def clone_repo(
    repo_url: str,
    head_ref: str | None = None,
    base_ref: str | None = None,
    folder_name: str | None = None,
    sparse: bool = True,
    shallow: bool = True,
    blob_filter: bool = True,
    remove_existing: bool = True,
) -> str:
    """Clone a repository optimized for PR review.

    Uses partial clone optimizations including shallow clone, sparse checkout, and blob filtering
    to efficiently fetch only required content.

    Parameters
    ----------
    repo_url
        Repository URL to clone
    head_ref
        Head ref to checkout
    base_ref
        Base ref to fetch for diff computation
    folder_name
        Optional name prefix for temp directory
    sparse
        Enable sparse checkout mode to avoid populating all files initially
    shallow
        Enable shallow clone (depth=1) to fetch only the target commit
    blob_filter
        Enable blob filtering (--filter=blob:none) to fetch file contents on-demand
    remove_existing
        Remove existing directory if it exists

    Returns
    -------
    :
        Path to the cloned repository

    Raises
    ------
    RuntimeError
        If Git version check fails
    GitCommandError
        If clone operation fails
    """
    if not valid_git_version_available():
        raise RuntimeError("Git version check failed. Please upgrade Git to the minimum required version.")

    tmp_dir = f"/tmp/{folder_name}" if folder_name else mkdtemp(prefix=str(uuid.uuid4()))
    logger.info(f"Cloning repo (sparse={sparse}, shallow={shallow}, blob_filter={blob_filter}) to {tmp_dir}")

    if os.path.exists(tmp_dir):
        if remove_existing:
            logger.info(f"Removing existing directory {tmp_dir}")
            shutil.rmtree(tmp_dir)
        else:
            return tmp_dir

    clone_args = []
    if shallow:
        clone_args.extend(["--depth", "1"])
    if sparse:
        clone_args.append("--sparse")
    if blob_filter:
        clone_args.extend(["--filter", "blob:none"])
    if head_ref:
        clone_args.extend(["--revision", head_ref])

    try:
        repository_path = ""
        repo = Repo.clone_from(repo_url, tmp_dir, multi_options=clone_args)
        repository_path = _repo_to_path(repo)
        if sparse and blob_filter:
            logger.info("Partial clone ready - file contents will be fetched on-demand during git operations")
        if base_ref:
            fetch_commit_ref(repository_path, base_ref)
    except GitCommandError as e:
        logger.exception(f"Clone failed: {e}\nClone arguments used: {clone_args}")
        raise e

    return repository_path
get_diff_between_commits(base_hash: str, head_hash: str = 'HEAD', files_exclude_patterns: list[str] | None = None, files_include_patterns: list[str] | None = None, files_reinclude_patterns: list[str] | None = None, batch_size: int = 50, include_line_numbers: bool = False, repo_path: str = '/tmp/') -> str

Get the diff between two commits, optionally filtering files by glob patterns.

The filtering is done in a specific order to ensure correct pattern application: 1. First, if include patterns are provided, only files matching those patterns are kept 2. Then, exclude patterns are applied to filter out matching files 3. Finally, reinclude patterns can override the exclude patterns to bring back specific files

This order ensures that reinclude patterns only affect files that were actually excluded, preventing the reinclude of files that weren't matched by include patterns in the first place.

Parameters:

Name Type Description Default
base_hash str

Base commit hash to compare from

required
head_hash str

Head commit hash to compare to. If not provided, uses HEAD

'HEAD'
files_exclude_patterns list[str] | None

List of glob patterns to exclude from the diff (relative to repo root). These patterns take precedence over include patterns.

None
files_include_patterns list[str] | None

List of glob patterns to include in the diff (relative to repo root). Note that exclude patterns will override these if there are conflicts.

None
files_reinclude_patterns list[str] | None

List of glob patterns to re-include files that were excluded by the exclude patterns. These patterns will only affect files that were previously excluded.

None
repo_path str

Path to the git repository

'/tmp/'
batch_size int

Number of files to process in each batch.

50
include_line_numbers bool

Whether to include line numbers in diff output (default: False)

False

Returns:

Type Description
str

Diff as a string

Raises:

Type Description
DiffNotFoundError

If there is an unexpected git error

Source code in src/lampe/core/tools/repository/diff.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def get_diff_between_commits(
    base_hash: str,
    head_hash: str = "HEAD",
    files_exclude_patterns: list[str] | None = None,
    files_include_patterns: list[str] | None = None,
    files_reinclude_patterns: list[str] | None = None,
    batch_size: int = 50,
    include_line_numbers: bool = False,
    repo_path: str = "/tmp/",
) -> str:
    """Get the diff between two commits, optionally filtering files by glob patterns.

    The filtering is done in a specific order to ensure correct pattern application:
    1. First, if include patterns are provided, only files matching those patterns are kept
    2. Then, exclude patterns are applied to filter out matching files
    3. Finally, reinclude patterns can override the exclude patterns to bring back specific files

    This order ensures that reinclude patterns only affect files that were actually excluded,
    preventing the reinclude of files that weren't matched by include patterns in the first place.

    Parameters
    ----------
    base_hash
        Base commit hash to compare from
    head_hash
        Head commit hash to compare to. If not provided, uses HEAD
    files_exclude_patterns
        List of glob patterns to exclude from the diff (relative to repo root).
        These patterns take precedence over include patterns.
    files_include_patterns
        List of glob patterns to include in the diff (relative to repo root).
        Note that exclude patterns will override these if there are conflicts.
    files_reinclude_patterns
        List of glob patterns to re-include files that were excluded by the exclude patterns.
        These patterns will only affect files that were previously excluded.
    repo_path
        Path to the git repository
    batch_size
        Number of files to process in each batch.
    include_line_numbers
        Whether to include line numbers in diff output (default: False)
    Returns
    -------
    :
        Diff as a string

    Raises
    ------
    DiffNotFoundError
        If there is an unexpected git error
    """
    try:
        repo = Repo(path=repo_path)
        changed_files = ""
        with LocalCommitsAvailability(repo_path, [base_hash, head_hash]):
            changed_files = repo.git.diff(base_hash, head_hash, "--name-only")

        if files_include_patterns and files_exclude_patterns:
            include_patterns = set(files_include_patterns)
            exclude_patterns = set(files_exclude_patterns)
            overlap = include_patterns & exclude_patterns
            if overlap:
                logger.warning(
                    f"Overlapping patterns found in include and exclude patterns: {overlap}. "
                    "Exclude patterns will take precedence as per git pathspec documentation."
                )

        filtered_files = []
        for f in changed_files.splitlines():
            if files_include_patterns and not any(fnmatch(f, pat) for pat in files_include_patterns):
                continue
            if files_exclude_patterns and any(fnmatch(f, pat) for pat in files_exclude_patterns):
                if not (files_reinclude_patterns and any(fnmatch(f, pat) for pat in files_reinclude_patterns)):
                    continue
            filtered_files.append(f)

        diffs = []
        for batch in batched(filtered_files, batch_size):
            diff = repo.git.diff(base_hash, head_hash, "--", *batch)
            if diff:
                diffs.append(sanitize_utf8(diff))
            elif include_line_numbers:
                # Git diff already includes line numbers in the @@ -X,Y +A,B @@ format
                # and shows line numbers in the context, so we don't need to modify it
                pass
        return "\n".join(diffs)
    except GitCommandError as e:
        logger.exception(f"Unexpected error getting diff: {e}")
        raise DiffNotFoundError(f"Diff not found for commits {base_hash} and {head_hash}") from e
view_file(commit_hash: str, file_path: str, line_start: int | None = None, line_end: int | None = None, include_line_numbers: bool = False, repo_path: str = '/tmp/') -> str

Get file content from a specific commit.

Parameters:

Name Type Description Default
commit_hash str

Commit reference (e.g., "main", commit hash)

required
file_path str

Path to the file within the repository

required
line_start int | None

Line range start index (0-based) of head_content to extract content from

None
line_end int | None

Line range end index (0-based) of head_content to extract content to

None
include_line_numbers bool

Whether to prefix each line with its line number (default: False)

False
repo_path str

Path to the git repository, by default "/tmp/"

'/tmp/'

Returns:

Type Description
str

File content as a string, empty string if file doesn't exist or line range is invalid

Raises:

Type Description
GitCommandError

If the file doesn't exist or any other git error occurs

Source code in src/lampe/core/tools/repository/content.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def get_file_content_at_commit(
    commit_hash: str,
    file_path: str,
    line_start: int | None = None,
    line_end: int | None = None,
    include_line_numbers: bool = False,
    repo_path: str = "/tmp/",
) -> str:
    """Get file content from a specific commit.

    Parameters
    ----------
    commit_hash
        Commit reference (e.g., "main", commit hash)
    file_path
        Path to the file within the repository
    line_start
        Line range start index (0-based) of head_content to extract content from
    line_end
        Line range end index (0-based) of head_content to extract content to
    include_line_numbers
        Whether to prefix each line with its line number (default: False)
    repo_path
        Path to the git repository, by default "/tmp/"

    Returns
    -------
    :
        File content as a string, empty string if file doesn't exist or line range is invalid

    Raises
    ------
    GitCommandError
        If the file doesn't exist or any other git error occurs
    """
    try:
        # Check file size if no line range is specified
        if line_start is None and line_end is None:
            file_size = get_file_size_at_commit(file_path, commit_hash, repo_path)
            if file_size > MAX_FILE_SIZE_CHARS:
                error_msg = (
                    f"Error: File too large (>{MAX_FILE_SIZE_CHARS // 1000}KB). File size: {file_size} bytes. "
                    "Cannot read full file content. "
                    "Please use line_start and line_end parameters to read specific line ranges."
                )
                logger.warning(f"File {file_path} at {commit_hash} is too large ({file_size} bytes)")
                return error_msg

        blob = ""
        repo = Repo(path=repo_path)
        with LocalCommitsAvailability(repo_path, [commit_hash]):
            blob = repo.git.show(f"{commit_hash}:{file_path}")
            blob = sanitize_utf8(blob)
        if line_start is not None and line_end is not None:
            blob = "\n".join(blob.splitlines()[line_start : line_end + 1])

        if include_line_numbers:
            lines = blob.splitlines()
            numbered_lines = []
            start_line = 0 if line_start is None else line_start
            for i, line in enumerate(lines):
                line_number = start_line + i
                numbered_lines.append(f"{line_number:>6}| {line}")
            blob = "\n".join(numbered_lines)

        return blob
    except GitCommandError as e:
        logger.exception(f"Error getting file content: {e}")
        raise
repository
FileDiffInfo

Bases: BaseModel

Information about a single file diff.

LocalCommitsAvailability(repo_path: str, commits: list[str])

Context manager to check if commits are available locally before git operations.

Checks if specified commits exist locally using git fsck --root and fetches them if they're not present. This is useful for ensuring all required commits are available before performing git operations that depend on them.

Attributes:

Name Type Description
repo_path

Path to the git repository

commits

List of commit references to check and fetch if needed

Source code in src/lampe/core/tools/repository/management.py
219
220
221
222
223
def __init__(self, repo_path: str, commits: list[str]):
    self.repo_path = repo_path
    self.commits = commits
    self.repo = Repo(path=repo_path)
    self._fetched_commits = []
TempGitRepository(repo_url: str, head_ref: str | None = None, base_ref: str | None = None, folder_name: str | None = None, sparse: bool = True, shallow: bool = True, blob_filter: bool = True, remove_existing: bool = True)

Context Manager for cloning and cleaning up a local clone of a repository

Uses partial clone optimizations including shallow clone, sparse checkout, and blob filtering to efficiently fetch only required content. Upon exit, will attempt to delete the cloned repository.

Attributes:

Name Type Description
repo_url

Repository URL to clone

head_ref

Optional head ref to check out.

folder_name

Optional name prefix for temp directory

sparse

Enable sparse checkout mode to avoid populating all files initially.

shallow

Enable shallow clone (depth=1) to fetch only the target commit.

blob_filter

Enable blob filtering (--filter=blob:none) to fetch file contents on-demand

remove_existing

Remove existing directory if it exists

Raises:

Type Description
RuntimeError

If Git version check fails

GitCommandError

If clone operation fails

UnableToDeleteError

If unable to delete the cloned repository

Source code in src/lampe/core/tools/repository/management.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def __init__(
    self,
    repo_url: str,
    head_ref: str | None = None,
    base_ref: str | None = None,
    folder_name: str | None = None,
    sparse: bool = True,
    shallow: bool = True,
    blob_filter: bool = True,
    remove_existing: bool = True,
):
    self.repo_url = repo_url
    self.head_ref = head_ref
    self.base_ref = base_ref
    self.folder_name = folder_name
    self.sparse = sparse
    self.shallow = shallow
    self.blob_filter = blob_filter
    self.remove_existing = remove_existing
    self.path_to_local_repo = None
clone_repo(repo_url: str, head_ref: str | None = None, base_ref: str | None = None, folder_name: str | None = None, sparse: bool = True, shallow: bool = True, blob_filter: bool = True, remove_existing: bool = True) -> str

Clone a repository optimized for PR review.

Uses partial clone optimizations including shallow clone, sparse checkout, and blob filtering to efficiently fetch only required content.

Parameters:

Name Type Description Default
repo_url str

Repository URL to clone

required
head_ref str | None

Head ref to checkout

None
base_ref str | None

Base ref to fetch for diff computation

None
folder_name str | None

Optional name prefix for temp directory

None
sparse bool

Enable sparse checkout mode to avoid populating all files initially

True
shallow bool

Enable shallow clone (depth=1) to fetch only the target commit

True
blob_filter bool

Enable blob filtering (--filter=blob:none) to fetch file contents on-demand

True
remove_existing bool

Remove existing directory if it exists

True

Returns:

Type Description
str

Path to the cloned repository

Raises:

Type Description
RuntimeError

If Git version check fails

GitCommandError

If clone operation fails

Source code in src/lampe/core/tools/repository/management.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def clone_repo(
    repo_url: str,
    head_ref: str | None = None,
    base_ref: str | None = None,
    folder_name: str | None = None,
    sparse: bool = True,
    shallow: bool = True,
    blob_filter: bool = True,
    remove_existing: bool = True,
) -> str:
    """Clone a repository optimized for PR review.

    Uses partial clone optimizations including shallow clone, sparse checkout, and blob filtering
    to efficiently fetch only required content.

    Parameters
    ----------
    repo_url
        Repository URL to clone
    head_ref
        Head ref to checkout
    base_ref
        Base ref to fetch for diff computation
    folder_name
        Optional name prefix for temp directory
    sparse
        Enable sparse checkout mode to avoid populating all files initially
    shallow
        Enable shallow clone (depth=1) to fetch only the target commit
    blob_filter
        Enable blob filtering (--filter=blob:none) to fetch file contents on-demand
    remove_existing
        Remove existing directory if it exists

    Returns
    -------
    :
        Path to the cloned repository

    Raises
    ------
    RuntimeError
        If Git version check fails
    GitCommandError
        If clone operation fails
    """
    if not valid_git_version_available():
        raise RuntimeError("Git version check failed. Please upgrade Git to the minimum required version.")

    tmp_dir = f"/tmp/{folder_name}" if folder_name else mkdtemp(prefix=str(uuid.uuid4()))
    logger.info(f"Cloning repo (sparse={sparse}, shallow={shallow}, blob_filter={blob_filter}) to {tmp_dir}")

    if os.path.exists(tmp_dir):
        if remove_existing:
            logger.info(f"Removing existing directory {tmp_dir}")
            shutil.rmtree(tmp_dir)
        else:
            return tmp_dir

    clone_args = []
    if shallow:
        clone_args.extend(["--depth", "1"])
    if sparse:
        clone_args.append("--sparse")
    if blob_filter:
        clone_args.extend(["--filter", "blob:none"])
    if head_ref:
        clone_args.extend(["--revision", head_ref])

    try:
        repository_path = ""
        repo = Repo.clone_from(repo_url, tmp_dir, multi_options=clone_args)
        repository_path = _repo_to_path(repo)
        if sparse and blob_filter:
            logger.info("Partial clone ready - file contents will be fetched on-demand during git operations")
        if base_ref:
            fetch_commit_ref(repository_path, base_ref)
    except GitCommandError as e:
        logger.exception(f"Clone failed: {e}\nClone arguments used: {clone_args}")
        raise e

    return repository_path
fetch_commit_ref(repo_path: str, commit_ref: str) -> None

Fetch a base reference from the remote repository.

Parameters:

Name Type Description Default
repo_path str

Path to the git repository

required
commit_ref str

Commit reference to fetch (e.g., branch name, commit hash)

required

Raises:

Type Description
GitCommandError

If the fetch operation fails

Source code in src/lampe/core/tools/repository/management.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def fetch_commit_ref(repo_path: str, commit_ref: str) -> None:
    """Fetch a base reference from the remote repository.

    Parameters
    ----------
    repo_path
        Path to the git repository
    commit_ref
        Commit reference to fetch (e.g., branch name, commit hash)

    Raises
    ------
    GitCommandError
        If the fetch operation fails
    """
    repo = Repo(path=repo_path)

    repo.git.fetch("--no-tags", "--depth=1", "--filter=blob:none", "origin", commit_ref)
find_files_by_pattern(pattern: str, repo_path: str = '/tmp/') -> str

Search for files using git ls-files and pattern matching.

Parameters:

Name Type Description Default
pattern str

Pattern to search for (e.g. ".py", "src//.md")

required
repo_path str

Path to git repository

'/tmp/'

Returns:

Type Description
str

Formatted string containing matching file paths

Source code in src/lampe/core/tools/repository/search.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def find_files_by_pattern(pattern: str, repo_path: str = "/tmp/") -> str:
    """Search for files using git ls-files and pattern matching.

    Parameters
    ----------
    pattern
        Pattern to search for (e.g. "*.py", "src/**/*.md")
    repo_path
        Path to git repository

    Returns
    -------
    str
        Formatted string containing matching file paths
    """
    repo = Repo(path=repo_path)
    try:
        # Filter files matching pattern using git's pathspec matching
        ls_output = repo.git.ls_files("--", pattern)
        ls_output = sanitize_utf8(ls_output)
        matching = ls_output.splitlines()

        if not matching:
            return "No files found"

        return f"```shell\n{'\n'.join(matching)}\n```"

    except GitCommandError as e:
        logger.exception(f"Error finding files: {e}")
        return f"Error: {str(e)}"
get_diff_between_commits(base_hash: str, head_hash: str = 'HEAD', files_exclude_patterns: list[str] | None = None, files_include_patterns: list[str] | None = None, files_reinclude_patterns: list[str] | None = None, batch_size: int = 50, include_line_numbers: bool = False, repo_path: str = '/tmp/') -> str

Get the diff between two commits, optionally filtering files by glob patterns.

The filtering is done in a specific order to ensure correct pattern application: 1. First, if include patterns are provided, only files matching those patterns are kept 2. Then, exclude patterns are applied to filter out matching files 3. Finally, reinclude patterns can override the exclude patterns to bring back specific files

This order ensures that reinclude patterns only affect files that were actually excluded, preventing the reinclude of files that weren't matched by include patterns in the first place.

Parameters:

Name Type Description Default
base_hash str

Base commit hash to compare from

required
head_hash str

Head commit hash to compare to. If not provided, uses HEAD

'HEAD'
files_exclude_patterns list[str] | None

List of glob patterns to exclude from the diff (relative to repo root). These patterns take precedence over include patterns.

None
files_include_patterns list[str] | None

List of glob patterns to include in the diff (relative to repo root). Note that exclude patterns will override these if there are conflicts.

None
files_reinclude_patterns list[str] | None

List of glob patterns to re-include files that were excluded by the exclude patterns. These patterns will only affect files that were previously excluded.

None
repo_path str

Path to the git repository

'/tmp/'
batch_size int

Number of files to process in each batch.

50
include_line_numbers bool

Whether to include line numbers in diff output (default: False)

False

Returns:

Type Description
str

Diff as a string

Raises:

Type Description
DiffNotFoundError

If there is an unexpected git error

Source code in src/lampe/core/tools/repository/diff.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def get_diff_between_commits(
    base_hash: str,
    head_hash: str = "HEAD",
    files_exclude_patterns: list[str] | None = None,
    files_include_patterns: list[str] | None = None,
    files_reinclude_patterns: list[str] | None = None,
    batch_size: int = 50,
    include_line_numbers: bool = False,
    repo_path: str = "/tmp/",
) -> str:
    """Get the diff between two commits, optionally filtering files by glob patterns.

    The filtering is done in a specific order to ensure correct pattern application:
    1. First, if include patterns are provided, only files matching those patterns are kept
    2. Then, exclude patterns are applied to filter out matching files
    3. Finally, reinclude patterns can override the exclude patterns to bring back specific files

    This order ensures that reinclude patterns only affect files that were actually excluded,
    preventing the reinclude of files that weren't matched by include patterns in the first place.

    Parameters
    ----------
    base_hash
        Base commit hash to compare from
    head_hash
        Head commit hash to compare to. If not provided, uses HEAD
    files_exclude_patterns
        List of glob patterns to exclude from the diff (relative to repo root).
        These patterns take precedence over include patterns.
    files_include_patterns
        List of glob patterns to include in the diff (relative to repo root).
        Note that exclude patterns will override these if there are conflicts.
    files_reinclude_patterns
        List of glob patterns to re-include files that were excluded by the exclude patterns.
        These patterns will only affect files that were previously excluded.
    repo_path
        Path to the git repository
    batch_size
        Number of files to process in each batch.
    include_line_numbers
        Whether to include line numbers in diff output (default: False)
    Returns
    -------
    :
        Diff as a string

    Raises
    ------
    DiffNotFoundError
        If there is an unexpected git error
    """
    try:
        repo = Repo(path=repo_path)
        changed_files = ""
        with LocalCommitsAvailability(repo_path, [base_hash, head_hash]):
            changed_files = repo.git.diff(base_hash, head_hash, "--name-only")

        if files_include_patterns and files_exclude_patterns:
            include_patterns = set(files_include_patterns)
            exclude_patterns = set(files_exclude_patterns)
            overlap = include_patterns & exclude_patterns
            if overlap:
                logger.warning(
                    f"Overlapping patterns found in include and exclude patterns: {overlap}. "
                    "Exclude patterns will take precedence as per git pathspec documentation."
                )

        filtered_files = []
        for f in changed_files.splitlines():
            if files_include_patterns and not any(fnmatch(f, pat) for pat in files_include_patterns):
                continue
            if files_exclude_patterns and any(fnmatch(f, pat) for pat in files_exclude_patterns):
                if not (files_reinclude_patterns and any(fnmatch(f, pat) for pat in files_reinclude_patterns)):
                    continue
            filtered_files.append(f)

        diffs = []
        for batch in batched(filtered_files, batch_size):
            diff = repo.git.diff(base_hash, head_hash, "--", *batch)
            if diff:
                diffs.append(sanitize_utf8(diff))
            elif include_line_numbers:
                # Git diff already includes line numbers in the @@ -X,Y +A,B @@ format
                # and shows line numbers in the context, so we don't need to modify it
                pass
        return "\n".join(diffs)
    except GitCommandError as e:
        logger.exception(f"Unexpected error getting diff: {e}")
        raise DiffNotFoundError(f"Diff not found for commits {base_hash} and {head_hash}") from e
get_diff_for_files(base_reference: str, file_paths: list[str] | None = None, head_reference: str = 'HEAD', repo_path: str = '/tmp/', batch_size: int = 50) -> str

Get the diff between two commits, optionally for specific files.

Parameters:

Name Type Description Default
base_reference str

Base commit reference (e.g., "main", commit hash)

required
file_paths list[str] | None

List of file paths to get diff for

None
head_reference str

Head commit reference (e.g., "feature", commit hash). Defaults to "HEAD"

'HEAD'
repo_path str

Path to git repository, by default "/tmp/"

'/tmp/'
batch_size int

Number of files to process in each batch.

50

Returns:

Type Description
str

Formatted string containing diffs for specified files or all changed files

Source code in src/lampe/core/tools/repository/diff.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def get_diff_for_files(
    base_reference: str,
    file_paths: list[str] | None = None,
    head_reference: str = "HEAD",
    repo_path: str = "/tmp/",
    batch_size: int = 50,
) -> str:
    """Get the diff between two commits, optionally for specific files.

    Parameters
    ----------
    base_reference
        Base commit reference (e.g., "main", commit hash)
    file_paths
        List of file paths to get diff for
    head_reference
        Head commit reference (e.g., "feature", commit hash). Defaults to "HEAD"
    repo_path
        Path to git repository, by default "/tmp/"
    batch_size
        Number of files to process in each batch.

    Returns
    -------
    str
        Formatted string containing diffs for specified files or all changed files
    """
    repo = Repo(path=repo_path)
    with LocalCommitsAvailability(repo_path, [base_reference, head_reference]):
        if file_paths:
            # Get diff for specific files
            diffs = []
            for batch_file_paths in batched(iterable=file_paths, n=batch_size):
                try:
                    diff = repo.git.diff(base_reference, head_reference, "--", *batch_file_paths)
                    if diff:
                        diffs.append(sanitize_utf8(diff))
                except GitCommandError:
                    # Skip files that don't exist or can't be diffed
                    logger.debug(f"Files {batch_file_paths} not found or can't be diffed in get_diff_for_files")
                    continue
            return "\n".join(diffs)
        else:
            # Get diff for all changed files
            diff = repo.git.diff(base_reference, head_reference)
            return sanitize_utf8(diff)
get_file_content_at_commit(commit_hash: str, file_path: str, line_start: int | None = None, line_end: int | None = None, include_line_numbers: bool = False, repo_path: str = '/tmp/') -> str

Get file content from a specific commit.

Parameters:

Name Type Description Default
commit_hash str

Commit reference (e.g., "main", commit hash)

required
file_path str

Path to the file within the repository

required
line_start int | None

Line range start index (0-based) of head_content to extract content from

None
line_end int | None

Line range end index (0-based) of head_content to extract content to

None
include_line_numbers bool

Whether to prefix each line with its line number (default: False)

False
repo_path str

Path to the git repository, by default "/tmp/"

'/tmp/'

Returns:

Type Description
str

File content as a string, empty string if file doesn't exist or line range is invalid

Raises:

Type Description
GitCommandError

If the file doesn't exist or any other git error occurs

Source code in src/lampe/core/tools/repository/content.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def get_file_content_at_commit(
    commit_hash: str,
    file_path: str,
    line_start: int | None = None,
    line_end: int | None = None,
    include_line_numbers: bool = False,
    repo_path: str = "/tmp/",
) -> str:
    """Get file content from a specific commit.

    Parameters
    ----------
    commit_hash
        Commit reference (e.g., "main", commit hash)
    file_path
        Path to the file within the repository
    line_start
        Line range start index (0-based) of head_content to extract content from
    line_end
        Line range end index (0-based) of head_content to extract content to
    include_line_numbers
        Whether to prefix each line with its line number (default: False)
    repo_path
        Path to the git repository, by default "/tmp/"

    Returns
    -------
    :
        File content as a string, empty string if file doesn't exist or line range is invalid

    Raises
    ------
    GitCommandError
        If the file doesn't exist or any other git error occurs
    """
    try:
        # Check file size if no line range is specified
        if line_start is None and line_end is None:
            file_size = get_file_size_at_commit(file_path, commit_hash, repo_path)
            if file_size > MAX_FILE_SIZE_CHARS:
                error_msg = (
                    f"Error: File too large (>{MAX_FILE_SIZE_CHARS // 1000}KB). File size: {file_size} bytes. "
                    "Cannot read full file content. "
                    "Please use line_start and line_end parameters to read specific line ranges."
                )
                logger.warning(f"File {file_path} at {commit_hash} is too large ({file_size} bytes)")
                return error_msg

        blob = ""
        repo = Repo(path=repo_path)
        with LocalCommitsAvailability(repo_path, [commit_hash]):
            blob = repo.git.show(f"{commit_hash}:{file_path}")
            blob = sanitize_utf8(blob)
        if line_start is not None and line_end is not None:
            blob = "\n".join(blob.splitlines()[line_start : line_end + 1])

        if include_line_numbers:
            lines = blob.splitlines()
            numbered_lines = []
            start_line = 0 if line_start is None else line_start
            for i, line in enumerate(lines):
                line_number = start_line + i
                numbered_lines.append(f"{line_number:>6}| {line}")
            blob = "\n".join(numbered_lines)

        return blob
    except GitCommandError as e:
        logger.exception(f"Error getting file content: {e}")
        raise
is_sparse_clone(repo_path: str) -> bool

Check if a repository is a sparse clone.

A sparse clone is detected by checking multiple indicators: 1. If core.sparseCheckout is enabled 2. If .git/info/sparse-checkout file exists and has content

Parameters:

Name Type Description Default
repo_path str

Path to the git repository

required

Returns:

Type Description
bool

True if the repository appears to be a sparse clone, False otherwise

Raises:

Type Description
GitCommandError

If git commands fail

Source code in src/lampe/core/tools/repository/management.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def is_sparse_clone(repo_path: str) -> bool:
    """Check if a repository is a sparse clone.

    A sparse clone is detected by checking multiple indicators:
    1. If core.sparseCheckout is enabled
    2. If .git/info/sparse-checkout file exists and has content

    Parameters
    ----------
    repo_path
        Path to the git repository

    Returns
    -------
    bool
        True if the repository appears to be a sparse clone, False otherwise

    Raises
    ------
    GitCommandError
        If git commands fail
    """
    try:
        repo = Repo(path=repo_path)

        # Check if sparse checkout is enabled
        try:
            sparse_checkout = repo.git.config("core.sparseCheckout")
            if sparse_checkout.strip().lower() == "true":
                logger.debug(f"Sparse checkout enabled in {repo_path}")
                return True
        except GitCommandError:
            # core.sparseCheckout not set, continue with other checks
            pass

        # Check if .git/info/sparse-checkout file exists and has content
        sparse_checkout_file = Path(repo_path) / ".git" / "info" / "sparse-checkout"
        if sparse_checkout_file.exists():
            with open(sparse_checkout_file, "r") as f:
                content = f.read().strip()
                if content:
                    logger.debug(f"Sparse checkout file found with content in {repo_path}")
                    return True

        logger.debug(f"No sparse clone indicators found in {repo_path}")
        return False

    except Exception as e:
        logger.exception(f"Error checking if repository is sparse clone: {e}")
        return False
list_changed_files(base_reference: str, head_reference: str = 'HEAD', repo_path: str = '/tmp/') -> str

List files changed between base reference and HEAD, with change stats.

Parameters:

Name Type Description Default
base_reference str

Git reference (commit hash, branch name, etc.) to compare against HEAD

required
head_reference str

Git reference (commit hash, branch name, etc.) to compare against base reference. Defaults to "HEAD"

'HEAD'
repo_path str

Path to git repository, by default "/tmp/"

'/tmp/'

Returns:

Type Description
str

Formatted string listing changed files with status, additions/deletions and size Format: "[STATUS] filepath | +additions -deletions | sizeKB" STATUS is one of: A (added), D (deleted), M (modified)

Raises:

Type Description
GitCommandError

If there is an error executing git commands

Source code in src/lampe/core/tools/repository/diff.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def list_changed_files(base_reference: str, head_reference: str = "HEAD", repo_path: str = "/tmp/") -> str:
    """List files changed between base reference and HEAD, with change stats.

    Parameters
    ----------
    base_reference
        Git reference (commit hash, branch name, etc.) to compare against HEAD
    head_reference
        Git reference (commit hash, branch name, etc.) to compare against base reference. Defaults to "HEAD"
    repo_path
        Path to git repository, by default "/tmp/"

    Returns
    -------
    str
        Formatted string listing changed files with status, additions/deletions and size
        Format: "[STATUS] filepath | +additions -deletions | sizeKB"
        STATUS is one of: A (added), D (deleted), M (modified)

    Raises
    ------
    GitCommandError
        If there is an error executing git commands
    """
    repo = Repo(path=repo_path)
    numstat = repo.git.diff(base_reference, "--numstat")
    status_output = repo.git.diff(base_reference, "--name-status")

    status_map = {}
    for line in status_output.splitlines():
        if line:
            parts = line.split("\t")
            if len(parts) >= 2:
                status, path = parts[0], parts[-1]
                status_map[path] = "A" if status == "A" else "D" if status == "D" else "M"

    result = []
    for line in numstat.splitlines():
        parts = line.split("\t")
        if len(parts) == 3:
            additions, deletions, file_path = parts
            try:
                additions = int(additions)
            except ValueError:
                additions = 0
            try:
                deletions = int(deletions)
            except ValueError:
                deletions = 0
            try:
                size_kb = get_file_size_at_commit(file_path, head_reference, repo_path)
            except GitCommandError as e:
                size_kb = 0
                logger.exception(f"During list_changed_files, error getting file size: {e}, continuing...")

            status = status_map.get(file_path, "M")

            result.append(f"[{status}] {file_path} | +{additions} -{deletions} | {size_kb}KB")

    return "\n".join(sorted(result))
list_changed_files_as_objects(base_reference: str, head_reference: str = 'HEAD', repo_path: str = '/tmp/') -> list[FileDiffInfo]

List files changed between base reference and HEAD as structured objects.

Parameters:

Name Type Description Default
base_reference str

Git reference (commit hash, branch name, etc.) to compare against HEAD

required
head_reference str

Git reference (commit hash, branch name, etc.) to compare against base reference. Defaults to "HEAD"

'HEAD'
repo_path str

Path to git repository, by default "/tmp/"

'/tmp/'

Returns:

Type Description
list[FileDiffInfo]

List of FileDiffInfo objects for each changed file

Raises:

Type Description
GitCommandError

If there is an error executing git commands

Source code in src/lampe/core/tools/repository/diff.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def list_changed_files_as_objects(
    base_reference: str, head_reference: str = "HEAD", repo_path: str = "/tmp/"
) -> list[FileDiffInfo]:
    """List files changed between base reference and HEAD as structured objects.

    Parameters
    ----------
    base_reference
        Git reference (commit hash, branch name, etc.) to compare against HEAD
    head_reference
        Git reference (commit hash, branch name, etc.) to compare against base reference. Defaults to "HEAD"
    repo_path
        Path to git repository, by default "/tmp/"

    Returns
    -------
    list[FileDiffInfo]
        List of FileDiffInfo objects for each changed file

    Raises
    ------
    GitCommandError
        If there is an error executing git commands
    """
    repo = Repo(path=repo_path)
    numstat = repo.git.diff(base_reference, head_reference, "--numstat")
    status_output = repo.git.diff(base_reference, head_reference, "--name-status")

    status_map = {}
    for line in status_output.splitlines():
        if line:
            parts = line.split("\t")
            if len(parts) >= 2:
                status, path = parts[0], parts[-1]
                status_map[path] = "A" if status == "A" else "D" if status == "D" else "M"

    result = []
    for line in numstat.splitlines():
        parts = line.split("\t")
        if len(parts) == 3:
            additions, deletions, file_path = parts
            try:
                additions = int(additions)
            except ValueError:
                additions = 0
            try:
                deletions = int(deletions)
            except ValueError:
                deletions = 0
            try:
                size_kb = get_file_size_at_commit(file_path, head_reference, repo_path)
            except GitCommandError as e:
                size_kb = 0
                logger.exception(f"During list_changed_files_as_objects, error getting file size: {e}, continuing...")

            status = status_map.get(file_path, "M")

            result.append(
                FileDiffInfo(
                    file_path=file_path, status=status, additions=additions, deletions=deletions, size_kb=size_kb
                )
            )

    return sorted(result, key=lambda x: x.file_path)
list_directory_at_commit(relative_dir_path: str, commit_hash: str = 'HEAD', repo_path: str = '/tmp/') -> str

List directory contents at a specific commit (like ls).

Parameters:

Name Type Description Default
relative_dir_path str

Directory path relative to repository root (e.g. "src/", "." for repo root)

required
commit_hash str

Commit reference to list at (e.g., "main", commit hash). Defaults to "HEAD"

'HEAD'
repo_path str

Path to the git repository, by default "/tmp/"

'/tmp/'

Returns:

Type Description
str

Formatted listing of entries: type (blob/tree), name, path. One per line.

Source code in src/lampe/core/tools/repository/content.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def list_directory_at_commit(
    relative_dir_path: str,
    commit_hash: str = "HEAD",
    repo_path: str = "/tmp/",
) -> str:
    """List directory contents at a specific commit (like ls).

    Parameters
    ----------
    relative_dir_path
        Directory path relative to repository root (e.g. "src/", "." for repo root)
    commit_hash
        Commit reference to list at (e.g., "main", commit hash). Defaults to "HEAD"
    repo_path
        Path to the git repository, by default "/tmp/"

    Returns
    -------
    str
        Formatted listing of entries: type (blob/tree), name, path. One per line.
    """
    try:
        repo = Repo(path=repo_path)
        with LocalCommitsAvailability(repo_path, [commit_hash]):
            normalized = (relative_dir_path or "").strip().rstrip("/") or "."
            tree_ref = commit_hash if normalized == "." else f"{commit_hash}:{normalized}"
            ls_output = repo.git.ls_tree(tree_ref)
            ls_output = sanitize_utf8(ls_output)
        if not ls_output.strip():
            return "Empty directory"
        lines = []
        for line in ls_output.splitlines():
            # Format: <mode> <type> <hash><tab><name>
            parts = line.split("\t", 1)
            if len(parts) == 2:
                header, name = parts
                obj_type = header.split()[1] if len(header.split()) >= 2 else "?"
                prefix = relative_dir_path.rstrip("/") or "."
                full_path = f"{prefix}/{name}" if prefix != "." else name
                lines.append(f"{obj_type}\t{name}\t{full_path}")
            else:
                lines.append(line)
        return "```\n" + "\n".join(lines) + "\n```"
    except GitCommandError as e:
        if e.status == 128:
            return f"Error: Path not found or not a directory at {commit_hash}"
        logger.exception(f"Error listing directory: {e}")
        return f"Error: {str(e)}"
search_in_files(pattern: str, relative_dir_path: str, commit_reference: str, include_line_numbers: bool = False, repo_path: str = '/tmp/') -> str

Search for a pattern in files within a directory at a specific commit.

Parameters:

Name Type Description Default
pattern str

Pattern to search for

required
relative_dir_path str

Directory path to search in

required
commit_reference str

Commit reference to search at

required
include_line_numbers bool

Whether to include line numbers in search results (default: False)

False
repo_path str

Path to the git repository, by default "/tmp/"

'/tmp/'

Returns:

Type Description
str

Search results as a string

Source code in src/lampe/core/tools/repository/search.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def search_in_files(
    pattern: str,
    relative_dir_path: str,
    commit_reference: str,
    include_line_numbers: bool = False,
    repo_path: str = "/tmp/",
) -> str:
    """Search for a pattern in files within a directory at a specific commit.

    Parameters
    ----------
    pattern
        Pattern to search for
    relative_dir_path
        Directory path to search in
    commit_reference
        Commit reference to search at
    include_line_numbers
        Whether to include line numbers in search results (default: False)
    repo_path
        Path to the git repository, by default "/tmp/"

    Returns
    -------
    str
        Search results as a string
    """
    try:
        repo = Repo(path=repo_path)
        normalized = (relative_dir_path or "").strip().rstrip("/") or "."
        commit_reference_path = commit_reference if normalized == "." else f"{commit_reference}:{normalized}"
        if include_line_numbers:
            grep_output = repo.git.grep("-n", pattern, commit_reference_path)
        else:
            grep_output = repo.git.grep(pattern, commit_reference_path)
        if grep_output:
            grep_output = sanitize_utf8(grep_output)
            return f"```grep\n{grep_output}\n```"
        return "No matches found"
    except GitCommandError as e:
        # Exit 1 = no match (git grep documented behavior)
        if e.status == 1:
            return "No matches found"
        # 128 and other codes = real errors (invalid ref, path issues, etc.)
        return f"Error executing git grep: {str(e)}"
show_commit(commit_reference: str, repo_path: str = '/tmp/') -> str

Show the contents of a commit.

This function shows the contents of a commit, including the commit details and diffs.

Parameters:

Name Type Description Default
commit_reference str

Commit reference (e.g., "main", commit hash)

required
repo_path str

Path to git repository, by default "/tmp/"

'/tmp/'

Returns:

Type Description
str

Formatted string containing commit details and diffs

Source code in src/lampe/core/tools/repository/history.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def show_commit(commit_reference: str, repo_path: str = "/tmp/") -> str:
    """Show the contents of a commit.

    This function shows the contents of a commit, including the commit details and diffs.

    Parameters
    ----------
    commit_reference
        Commit reference (e.g., "main", commit hash)
    repo_path
        Path to git repository, by default "/tmp/"

    Returns
    -------
    str
        Formatted string containing commit details and diffs
    """
    repo = Repo(path=repo_path)
    commit = repo.commit(commit_reference)
    output = [
        f"Commit: {commit.hexsha}\n"
        f"Author: {commit.author}\n"
        f"Date: {commit.authored_datetime}\n"
        f"Message: {commit.message}\n"
        f"Files: {len(commit.stats.files)} files changed\n"
        f"Changes: +{commit.stats.total['insertions']} -{commit.stats.total['deletions']}\n"
        f"Modified files:\n" + "\n".join(f"  - {f}" for f in commit.stats.files)
    ]
    if commit.parents:
        parent = commit.parents[0]
        diff = parent.diff(commit, create_patch=True)
    else:
        diff = commit.diff(None, create_patch=True)
    for d in diff:
        output.append(f"\n--- {d.a_path}\n+++ {d.b_path}\n")
        if d.diff:
            diff_str = str(d.diff)
            output.append(sanitize_utf8(diff_str))
    return "".join(output)
content
file_exists(file_path: str, commit_hash: str = 'HEAD', repo_path: str = '/tmp/') -> bool

Check if a file exists in a specific commit.

Parameters:

Name Type Description Default
file_path str

Path to the file within the repository

required
commit_hash str

Commit reference to check (e.g., commit hash, branch name, tag). Defaults to "HEAD"

'HEAD'
repo_path str

Path to git repository, by default "/tmp/"

'/tmp/'

Returns:

Type Description
bool

True if file exists in the commit, False otherwise

Raises:

Type Description
GitCommandError

If there is an unexpected git error

Source code in src/lampe/core/tools/repository/content.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def file_exists(file_path: str, commit_hash: str = "HEAD", repo_path: str = "/tmp/") -> bool:
    """Check if a file exists in a specific commit.

    Parameters
    ----------
    file_path
        Path to the file within the repository
    commit_hash
        Commit reference to check (e.g., commit hash, branch name, tag). Defaults to "HEAD"
    repo_path
        Path to git repository, by default "/tmp/"

    Returns
    -------
    bool
        True if file exists in the commit, False otherwise

    Raises
    ------
    GitCommandError
        If there is an unexpected git error
    """
    try:
        repo = Repo(path=repo_path)
        with LocalCommitsAvailability(repo_path, [commit_hash]):
            repo.git.cat_file("-e", f"{commit_hash}:{file_path}")
        return True
    except GitCommandError as e:
        if e.status == 128:
            return False
        logger.exception(f"Unexpected error checking if file exists: {e}")
        raise
get_file_content_at_commit(commit_hash: str, file_path: str, line_start: int | None = None, line_end: int | None = None, include_line_numbers: bool = False, repo_path: str = '/tmp/') -> str

Get file content from a specific commit.

Parameters:

Name Type Description Default
commit_hash str

Commit reference (e.g., "main", commit hash)

required
file_path str

Path to the file within the repository

required
line_start int | None

Line range start index (0-based) of head_content to extract content from

None
line_end int | None

Line range end index (0-based) of head_content to extract content to

None
include_line_numbers bool

Whether to prefix each line with its line number (default: False)

False
repo_path str

Path to the git repository, by default "/tmp/"

'/tmp/'

Returns:

Type Description
str

File content as a string, empty string if file doesn't exist or line range is invalid

Raises:

Type Description
GitCommandError

If the file doesn't exist or any other git error occurs

Source code in src/lampe/core/tools/repository/content.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def get_file_content_at_commit(
    commit_hash: str,
    file_path: str,
    line_start: int | None = None,
    line_end: int | None = None,
    include_line_numbers: bool = False,
    repo_path: str = "/tmp/",
) -> str:
    """Get file content from a specific commit.

    Parameters
    ----------
    commit_hash
        Commit reference (e.g., "main", commit hash)
    file_path
        Path to the file within the repository
    line_start
        Line range start index (0-based) of head_content to extract content from
    line_end
        Line range end index (0-based) of head_content to extract content to
    include_line_numbers
        Whether to prefix each line with its line number (default: False)
    repo_path
        Path to the git repository, by default "/tmp/"

    Returns
    -------
    :
        File content as a string, empty string if file doesn't exist or line range is invalid

    Raises
    ------
    GitCommandError
        If the file doesn't exist or any other git error occurs
    """
    try:
        # Check file size if no line range is specified
        if line_start is None and line_end is None:
            file_size = get_file_size_at_commit(file_path, commit_hash, repo_path)
            if file_size > MAX_FILE_SIZE_CHARS:
                error_msg = (
                    f"Error: File too large (>{MAX_FILE_SIZE_CHARS // 1000}KB). File size: {file_size} bytes. "
                    "Cannot read full file content. "
                    "Please use line_start and line_end parameters to read specific line ranges."
                )
                logger.warning(f"File {file_path} at {commit_hash} is too large ({file_size} bytes)")
                return error_msg

        blob = ""
        repo = Repo(path=repo_path)
        with LocalCommitsAvailability(repo_path, [commit_hash]):
            blob = repo.git.show(f"{commit_hash}:{file_path}")
            blob = sanitize_utf8(blob)
        if line_start is not None and line_end is not None:
            blob = "\n".join(blob.splitlines()[line_start : line_end + 1])

        if include_line_numbers:
            lines = blob.splitlines()
            numbered_lines = []
            start_line = 0 if line_start is None else line_start
            for i, line in enumerate(lines):
                line_number = start_line + i
                numbered_lines.append(f"{line_number:>6}| {line}")
            blob = "\n".join(numbered_lines)

        return blob
    except GitCommandError as e:
        logger.exception(f"Error getting file content: {e}")
        raise
get_file_size_at_commit(file_path: str, commit_hash: str = 'HEAD', repo_path: str = '/tmp/') -> int

Get the size of a file at a specific commit.

Parameters:

Name Type Description Default
file_path str

Path to the file within the repository

required
commit_hash str

Commit reference (e.g., "main", commit hash). Defaults to "HEAD"

'HEAD'
repo_path str

Path to the git repository, by default "/tmp/"

'/tmp/'

Returns:

Type Description
int

Size of the file in bytes

Source code in src/lampe/core/tools/repository/content.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def get_file_size_at_commit(file_path: str, commit_hash: str = "HEAD", repo_path: str = "/tmp/") -> int:
    """Get the size of a file at a specific commit.

    Parameters
    ----------
    file_path
        Path to the file within the repository
    commit_hash
        Commit reference (e.g., "main", commit hash). Defaults to "HEAD"
    repo_path
        Path to the git repository, by default "/tmp/"

    Returns
    -------
    :
        Size of the file in bytes
    """
    repo = Repo(path=repo_path)
    with LocalCommitsAvailability(repo_path, [commit_hash]):
        tree = repo.commit(rev=commit_hash).tree
    try:
        git_obj = tree[file_path]
        return git_obj.size
    except KeyError:
        return 0
list_directory_at_commit(relative_dir_path: str, commit_hash: str = 'HEAD', repo_path: str = '/tmp/') -> str

List directory contents at a specific commit (like ls).

Parameters:

Name Type Description Default
relative_dir_path str

Directory path relative to repository root (e.g. "src/", "." for repo root)

required
commit_hash str

Commit reference to list at (e.g., "main", commit hash). Defaults to "HEAD"

'HEAD'
repo_path str

Path to the git repository, by default "/tmp/"

'/tmp/'

Returns:

Type Description
str

Formatted listing of entries: type (blob/tree), name, path. One per line.

Source code in src/lampe/core/tools/repository/content.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def list_directory_at_commit(
    relative_dir_path: str,
    commit_hash: str = "HEAD",
    repo_path: str = "/tmp/",
) -> str:
    """List directory contents at a specific commit (like ls).

    Parameters
    ----------
    relative_dir_path
        Directory path relative to repository root (e.g. "src/", "." for repo root)
    commit_hash
        Commit reference to list at (e.g., "main", commit hash). Defaults to "HEAD"
    repo_path
        Path to the git repository, by default "/tmp/"

    Returns
    -------
    str
        Formatted listing of entries: type (blob/tree), name, path. One per line.
    """
    try:
        repo = Repo(path=repo_path)
        with LocalCommitsAvailability(repo_path, [commit_hash]):
            normalized = (relative_dir_path or "").strip().rstrip("/") or "."
            tree_ref = commit_hash if normalized == "." else f"{commit_hash}:{normalized}"
            ls_output = repo.git.ls_tree(tree_ref)
            ls_output = sanitize_utf8(ls_output)
        if not ls_output.strip():
            return "Empty directory"
        lines = []
        for line in ls_output.splitlines():
            # Format: <mode> <type> <hash><tab><name>
            parts = line.split("\t", 1)
            if len(parts) == 2:
                header, name = parts
                obj_type = header.split()[1] if len(header.split()) >= 2 else "?"
                prefix = relative_dir_path.rstrip("/") or "."
                full_path = f"{prefix}/{name}" if prefix != "." else name
                lines.append(f"{obj_type}\t{name}\t{full_path}")
            else:
                lines.append(line)
        return "```\n" + "\n".join(lines) + "\n```"
    except GitCommandError as e:
        if e.status == 128:
            return f"Error: Path not found or not a directory at {commit_hash}"
        logger.exception(f"Error listing directory: {e}")
        return f"Error: {str(e)}"
diff
FileDiffInfo

Bases: BaseModel

Information about a single file diff.

get_diff_between_commits(base_hash: str, head_hash: str = 'HEAD', files_exclude_patterns: list[str] | None = None, files_include_patterns: list[str] | None = None, files_reinclude_patterns: list[str] | None = None, batch_size: int = 50, include_line_numbers: bool = False, repo_path: str = '/tmp/') -> str

Get the diff between two commits, optionally filtering files by glob patterns.

The filtering is done in a specific order to ensure correct pattern application: 1. First, if include patterns are provided, only files matching those patterns are kept 2. Then, exclude patterns are applied to filter out matching files 3. Finally, reinclude patterns can override the exclude patterns to bring back specific files

This order ensures that reinclude patterns only affect files that were actually excluded, preventing the reinclude of files that weren't matched by include patterns in the first place.

Parameters:

Name Type Description Default
base_hash str

Base commit hash to compare from

required
head_hash str

Head commit hash to compare to. If not provided, uses HEAD

'HEAD'
files_exclude_patterns list[str] | None

List of glob patterns to exclude from the diff (relative to repo root). These patterns take precedence over include patterns.

None
files_include_patterns list[str] | None

List of glob patterns to include in the diff (relative to repo root). Note that exclude patterns will override these if there are conflicts.

None
files_reinclude_patterns list[str] | None

List of glob patterns to re-include files that were excluded by the exclude patterns. These patterns will only affect files that were previously excluded.

None
repo_path str

Path to the git repository

'/tmp/'
batch_size int

Number of files to process in each batch.

50
include_line_numbers bool

Whether to include line numbers in diff output (default: False)

False

Returns:

Type Description
str

Diff as a string

Raises:

Type Description
DiffNotFoundError

If there is an unexpected git error

Source code in src/lampe/core/tools/repository/diff.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def get_diff_between_commits(
    base_hash: str,
    head_hash: str = "HEAD",
    files_exclude_patterns: list[str] | None = None,
    files_include_patterns: list[str] | None = None,
    files_reinclude_patterns: list[str] | None = None,
    batch_size: int = 50,
    include_line_numbers: bool = False,
    repo_path: str = "/tmp/",
) -> str:
    """Get the diff between two commits, optionally filtering files by glob patterns.

    The filtering is done in a specific order to ensure correct pattern application:
    1. First, if include patterns are provided, only files matching those patterns are kept
    2. Then, exclude patterns are applied to filter out matching files
    3. Finally, reinclude patterns can override the exclude patterns to bring back specific files

    This order ensures that reinclude patterns only affect files that were actually excluded,
    preventing the reinclude of files that weren't matched by include patterns in the first place.

    Parameters
    ----------
    base_hash
        Base commit hash to compare from
    head_hash
        Head commit hash to compare to. If not provided, uses HEAD
    files_exclude_patterns
        List of glob patterns to exclude from the diff (relative to repo root).
        These patterns take precedence over include patterns.
    files_include_patterns
        List of glob patterns to include in the diff (relative to repo root).
        Note that exclude patterns will override these if there are conflicts.
    files_reinclude_patterns
        List of glob patterns to re-include files that were excluded by the exclude patterns.
        These patterns will only affect files that were previously excluded.
    repo_path
        Path to the git repository
    batch_size
        Number of files to process in each batch.
    include_line_numbers
        Whether to include line numbers in diff output (default: False)
    Returns
    -------
    :
        Diff as a string

    Raises
    ------
    DiffNotFoundError
        If there is an unexpected git error
    """
    try:
        repo = Repo(path=repo_path)
        changed_files = ""
        with LocalCommitsAvailability(repo_path, [base_hash, head_hash]):
            changed_files = repo.git.diff(base_hash, head_hash, "--name-only")

        if files_include_patterns and files_exclude_patterns:
            include_patterns = set(files_include_patterns)
            exclude_patterns = set(files_exclude_patterns)
            overlap = include_patterns & exclude_patterns
            if overlap:
                logger.warning(
                    f"Overlapping patterns found in include and exclude patterns: {overlap}. "
                    "Exclude patterns will take precedence as per git pathspec documentation."
                )

        filtered_files = []
        for f in changed_files.splitlines():
            if files_include_patterns and not any(fnmatch(f, pat) for pat in files_include_patterns):
                continue
            if files_exclude_patterns and any(fnmatch(f, pat) for pat in files_exclude_patterns):
                if not (files_reinclude_patterns and any(fnmatch(f, pat) for pat in files_reinclude_patterns)):
                    continue
            filtered_files.append(f)

        diffs = []
        for batch in batched(filtered_files, batch_size):
            diff = repo.git.diff(base_hash, head_hash, "--", *batch)
            if diff:
                diffs.append(sanitize_utf8(diff))
            elif include_line_numbers:
                # Git diff already includes line numbers in the @@ -X,Y +A,B @@ format
                # and shows line numbers in the context, so we don't need to modify it
                pass
        return "\n".join(diffs)
    except GitCommandError as e:
        logger.exception(f"Unexpected error getting diff: {e}")
        raise DiffNotFoundError(f"Diff not found for commits {base_hash} and {head_hash}") from e
get_diff_for_files(base_reference: str, file_paths: list[str] | None = None, head_reference: str = 'HEAD', repo_path: str = '/tmp/', batch_size: int = 50) -> str

Get the diff between two commits, optionally for specific files.

Parameters:

Name Type Description Default
base_reference str

Base commit reference (e.g., "main", commit hash)

required
file_paths list[str] | None

List of file paths to get diff for

None
head_reference str

Head commit reference (e.g., "feature", commit hash). Defaults to "HEAD"

'HEAD'
repo_path str

Path to git repository, by default "/tmp/"

'/tmp/'
batch_size int

Number of files to process in each batch.

50

Returns:

Type Description
str

Formatted string containing diffs for specified files or all changed files

Source code in src/lampe/core/tools/repository/diff.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def get_diff_for_files(
    base_reference: str,
    file_paths: list[str] | None = None,
    head_reference: str = "HEAD",
    repo_path: str = "/tmp/",
    batch_size: int = 50,
) -> str:
    """Get the diff between two commits, optionally for specific files.

    Parameters
    ----------
    base_reference
        Base commit reference (e.g., "main", commit hash)
    file_paths
        List of file paths to get diff for
    head_reference
        Head commit reference (e.g., "feature", commit hash). Defaults to "HEAD"
    repo_path
        Path to git repository, by default "/tmp/"
    batch_size
        Number of files to process in each batch.

    Returns
    -------
    str
        Formatted string containing diffs for specified files or all changed files
    """
    repo = Repo(path=repo_path)
    with LocalCommitsAvailability(repo_path, [base_reference, head_reference]):
        if file_paths:
            # Get diff for specific files
            diffs = []
            for batch_file_paths in batched(iterable=file_paths, n=batch_size):
                try:
                    diff = repo.git.diff(base_reference, head_reference, "--", *batch_file_paths)
                    if diff:
                        diffs.append(sanitize_utf8(diff))
                except GitCommandError:
                    # Skip files that don't exist or can't be diffed
                    logger.debug(f"Files {batch_file_paths} not found or can't be diffed in get_diff_for_files")
                    continue
            return "\n".join(diffs)
        else:
            # Get diff for all changed files
            diff = repo.git.diff(base_reference, head_reference)
            return sanitize_utf8(diff)
list_changed_files(base_reference: str, head_reference: str = 'HEAD', repo_path: str = '/tmp/') -> str

List files changed between base reference and HEAD, with change stats.

Parameters:

Name Type Description Default
base_reference str

Git reference (commit hash, branch name, etc.) to compare against HEAD

required
head_reference str

Git reference (commit hash, branch name, etc.) to compare against base reference. Defaults to "HEAD"

'HEAD'
repo_path str

Path to git repository, by default "/tmp/"

'/tmp/'

Returns:

Type Description
str

Formatted string listing changed files with status, additions/deletions and size Format: "[STATUS] filepath | +additions -deletions | sizeKB" STATUS is one of: A (added), D (deleted), M (modified)

Raises:

Type Description
GitCommandError

If there is an error executing git commands

Source code in src/lampe/core/tools/repository/diff.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def list_changed_files(base_reference: str, head_reference: str = "HEAD", repo_path: str = "/tmp/") -> str:
    """List files changed between base reference and HEAD, with change stats.

    Parameters
    ----------
    base_reference
        Git reference (commit hash, branch name, etc.) to compare against HEAD
    head_reference
        Git reference (commit hash, branch name, etc.) to compare against base reference. Defaults to "HEAD"
    repo_path
        Path to git repository, by default "/tmp/"

    Returns
    -------
    str
        Formatted string listing changed files with status, additions/deletions and size
        Format: "[STATUS] filepath | +additions -deletions | sizeKB"
        STATUS is one of: A (added), D (deleted), M (modified)

    Raises
    ------
    GitCommandError
        If there is an error executing git commands
    """
    repo = Repo(path=repo_path)
    numstat = repo.git.diff(base_reference, "--numstat")
    status_output = repo.git.diff(base_reference, "--name-status")

    status_map = {}
    for line in status_output.splitlines():
        if line:
            parts = line.split("\t")
            if len(parts) >= 2:
                status, path = parts[0], parts[-1]
                status_map[path] = "A" if status == "A" else "D" if status == "D" else "M"

    result = []
    for line in numstat.splitlines():
        parts = line.split("\t")
        if len(parts) == 3:
            additions, deletions, file_path = parts
            try:
                additions = int(additions)
            except ValueError:
                additions = 0
            try:
                deletions = int(deletions)
            except ValueError:
                deletions = 0
            try:
                size_kb = get_file_size_at_commit(file_path, head_reference, repo_path)
            except GitCommandError as e:
                size_kb = 0
                logger.exception(f"During list_changed_files, error getting file size: {e}, continuing...")

            status = status_map.get(file_path, "M")

            result.append(f"[{status}] {file_path} | +{additions} -{deletions} | {size_kb}KB")

    return "\n".join(sorted(result))
list_changed_files_as_objects(base_reference: str, head_reference: str = 'HEAD', repo_path: str = '/tmp/') -> list[FileDiffInfo]

List files changed between base reference and HEAD as structured objects.

Parameters:

Name Type Description Default
base_reference str

Git reference (commit hash, branch name, etc.) to compare against HEAD

required
head_reference str

Git reference (commit hash, branch name, etc.) to compare against base reference. Defaults to "HEAD"

'HEAD'
repo_path str

Path to git repository, by default "/tmp/"

'/tmp/'

Returns:

Type Description
list[FileDiffInfo]

List of FileDiffInfo objects for each changed file

Raises:

Type Description
GitCommandError

If there is an error executing git commands

Source code in src/lampe/core/tools/repository/diff.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def list_changed_files_as_objects(
    base_reference: str, head_reference: str = "HEAD", repo_path: str = "/tmp/"
) -> list[FileDiffInfo]:
    """List files changed between base reference and HEAD as structured objects.

    Parameters
    ----------
    base_reference
        Git reference (commit hash, branch name, etc.) to compare against HEAD
    head_reference
        Git reference (commit hash, branch name, etc.) to compare against base reference. Defaults to "HEAD"
    repo_path
        Path to git repository, by default "/tmp/"

    Returns
    -------
    list[FileDiffInfo]
        List of FileDiffInfo objects for each changed file

    Raises
    ------
    GitCommandError
        If there is an error executing git commands
    """
    repo = Repo(path=repo_path)
    numstat = repo.git.diff(base_reference, head_reference, "--numstat")
    status_output = repo.git.diff(base_reference, head_reference, "--name-status")

    status_map = {}
    for line in status_output.splitlines():
        if line:
            parts = line.split("\t")
            if len(parts) >= 2:
                status, path = parts[0], parts[-1]
                status_map[path] = "A" if status == "A" else "D" if status == "D" else "M"

    result = []
    for line in numstat.splitlines():
        parts = line.split("\t")
        if len(parts) == 3:
            additions, deletions, file_path = parts
            try:
                additions = int(additions)
            except ValueError:
                additions = 0
            try:
                deletions = int(deletions)
            except ValueError:
                deletions = 0
            try:
                size_kb = get_file_size_at_commit(file_path, head_reference, repo_path)
            except GitCommandError as e:
                size_kb = 0
                logger.exception(f"During list_changed_files_as_objects, error getting file size: {e}, continuing...")

            status = status_map.get(file_path, "M")

            result.append(
                FileDiffInfo(
                    file_path=file_path, status=status, additions=additions, deletions=deletions, size_kb=size_kb
                )
            )

    return sorted(result, key=lambda x: x.file_path)
encoding

Encoding utilities for git command outputs.

sanitize_utf8(text: str) -> str

Sanitize a string to ensure it contains only valid UTF-8 characters.

This function handles surrogate pairs and other invalid UTF-8 sequences that can occur when processing file content from git commands. Surrogate pairs are common in binary files or files with incorrect encoding.

The function uses 'replace' error handling which replaces invalid sequences with the Unicode replacement character (U+FFFD).

Parameters:

Name Type Description Default
text str

The text to sanitize (may contain surrogate pairs or invalid UTF-8)

required

Returns:

Type Description
str

Sanitized text containing only valid UTF-8 characters

Examples:

>>> sanitize_utf8("Valid text")
'Valid text'
>>> sanitize_utf8("Text with surrogates: \udcff\udcfe")
'Text with surrogates:'
Source code in src/lampe/core/tools/repository/encoding.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def sanitize_utf8(text: str) -> str:
    """
    Sanitize a string to ensure it contains only valid UTF-8 characters.

    This function handles surrogate pairs and other invalid UTF-8 sequences
    that can occur when processing file content from git commands. Surrogate
    pairs are common in binary files or files with incorrect encoding.

    The function uses 'replace' error handling which replaces invalid sequences
    with the Unicode replacement character (U+FFFD).

    Parameters
    ----------
    text : str
        The text to sanitize (may contain surrogate pairs or invalid UTF-8)

    Returns
    -------
    str
        Sanitized text containing only valid UTF-8 characters

    Examples
    --------
    >>> sanitize_utf8("Valid text")
    'Valid text'
    >>> sanitize_utf8("Text with surrogates: \\udcff\\udcfe")
    'Text with surrogates:'
    """
    if not text:
        return text

    # Encode to UTF-8 with 'replace' to handle surrogates, then decode back
    # This effectively replaces any invalid UTF-8 sequences (including surrogates)
    # with the replacement character (U+FFFD)
    return text.encode("utf-8", errors="replace").decode("utf-8", errors="replace")
history
get_commit_log(max_count: int, repo_path: str = '/tmp/') -> str

Get the log of commits for a repository.

This function gets the log of commits for a repository, including the commit details and the list of files path that were changed.

Parameters:

Name Type Description Default
max_count int

Maximum number of commits to return

required
repo_path str

Path to git repository, by default "/tmp/"

'/tmp/'

Returns:

Type Description
str

Formatted string containing commit details and list of files that were changed

Source code in src/lampe/core/tools/repository/history.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def get_commit_log(max_count: int, repo_path: str = "/tmp/") -> str:
    """Get the log of commits for a repository.

    This function gets the log of commits for a repository, including the commit details
    and the list of files path that were changed.

    Parameters
    ----------
    max_count
        Maximum number of commits to return
    repo_path
        Path to git repository, by default "/tmp/"

    Returns
    -------
    str
        Formatted string containing commit details and list of files that were changed
    """
    repo = Repo(path=repo_path)
    commits = list(repo.iter_commits(max_count=max_count))
    log = []
    for commit in commits:
        log.append(
            f"Commit: {commit.hexsha}\n"
            f"Author: {commit.author}\n"
            f"Date: {commit.authored_datetime}\n"
            f"Message: {commit.message}\n"
            f"Files: {len(commit.stats.files)} files changed\n"
            f"Changes: +{commit.stats.total['insertions']} -{commit.stats.total['deletions']}\n"
            f"Modified files:\n" + "\n".join(f"  - {f}" for f in commit.stats.files)
        )
    return "\n".join(log)
show_commit(commit_reference: str, repo_path: str = '/tmp/') -> str

Show the contents of a commit.

This function shows the contents of a commit, including the commit details and diffs.

Parameters:

Name Type Description Default
commit_reference str

Commit reference (e.g., "main", commit hash)

required
repo_path str

Path to git repository, by default "/tmp/"

'/tmp/'

Returns:

Type Description
str

Formatted string containing commit details and diffs

Source code in src/lampe/core/tools/repository/history.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def show_commit(commit_reference: str, repo_path: str = "/tmp/") -> str:
    """Show the contents of a commit.

    This function shows the contents of a commit, including the commit details and diffs.

    Parameters
    ----------
    commit_reference
        Commit reference (e.g., "main", commit hash)
    repo_path
        Path to git repository, by default "/tmp/"

    Returns
    -------
    str
        Formatted string containing commit details and diffs
    """
    repo = Repo(path=repo_path)
    commit = repo.commit(commit_reference)
    output = [
        f"Commit: {commit.hexsha}\n"
        f"Author: {commit.author}\n"
        f"Date: {commit.authored_datetime}\n"
        f"Message: {commit.message}\n"
        f"Files: {len(commit.stats.files)} files changed\n"
        f"Changes: +{commit.stats.total['insertions']} -{commit.stats.total['deletions']}\n"
        f"Modified files:\n" + "\n".join(f"  - {f}" for f in commit.stats.files)
    ]
    if commit.parents:
        parent = commit.parents[0]
        diff = parent.diff(commit, create_patch=True)
    else:
        diff = commit.diff(None, create_patch=True)
    for d in diff:
        output.append(f"\n--- {d.a_path}\n+++ {d.b_path}\n")
        if d.diff:
            diff_str = str(d.diff)
            output.append(sanitize_utf8(diff_str))
    return "".join(output)
management
LocalCommitsAvailability(repo_path: str, commits: list[str])

Context manager to check if commits are available locally before git operations.

Checks if specified commits exist locally using git fsck --root and fetches them if they're not present. This is useful for ensuring all required commits are available before performing git operations that depend on them.

Attributes:

Name Type Description
repo_path

Path to the git repository

commits

List of commit references to check and fetch if needed

Source code in src/lampe/core/tools/repository/management.py
219
220
221
222
223
def __init__(self, repo_path: str, commits: list[str]):
    self.repo_path = repo_path
    self.commits = commits
    self.repo = Repo(path=repo_path)
    self._fetched_commits = []
TempGitRepository(repo_url: str, head_ref: str | None = None, base_ref: str | None = None, folder_name: str | None = None, sparse: bool = True, shallow: bool = True, blob_filter: bool = True, remove_existing: bool = True)

Context Manager for cloning and cleaning up a local clone of a repository

Uses partial clone optimizations including shallow clone, sparse checkout, and blob filtering to efficiently fetch only required content. Upon exit, will attempt to delete the cloned repository.

Attributes:

Name Type Description
repo_url

Repository URL to clone

head_ref

Optional head ref to check out.

folder_name

Optional name prefix for temp directory

sparse

Enable sparse checkout mode to avoid populating all files initially.

shallow

Enable shallow clone (depth=1) to fetch only the target commit.

blob_filter

Enable blob filtering (--filter=blob:none) to fetch file contents on-demand

remove_existing

Remove existing directory if it exists

Raises:

Type Description
RuntimeError

If Git version check fails

GitCommandError

If clone operation fails

UnableToDeleteError

If unable to delete the cloned repository

Source code in src/lampe/core/tools/repository/management.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def __init__(
    self,
    repo_url: str,
    head_ref: str | None = None,
    base_ref: str | None = None,
    folder_name: str | None = None,
    sparse: bool = True,
    shallow: bool = True,
    blob_filter: bool = True,
    remove_existing: bool = True,
):
    self.repo_url = repo_url
    self.head_ref = head_ref
    self.base_ref = base_ref
    self.folder_name = folder_name
    self.sparse = sparse
    self.shallow = shallow
    self.blob_filter = blob_filter
    self.remove_existing = remove_existing
    self.path_to_local_repo = None
clone_repo(repo_url: str, head_ref: str | None = None, base_ref: str | None = None, folder_name: str | None = None, sparse: bool = True, shallow: bool = True, blob_filter: bool = True, remove_existing: bool = True) -> str

Clone a repository optimized for PR review.

Uses partial clone optimizations including shallow clone, sparse checkout, and blob filtering to efficiently fetch only required content.

Parameters:

Name Type Description Default
repo_url str

Repository URL to clone

required
head_ref str | None

Head ref to checkout

None
base_ref str | None

Base ref to fetch for diff computation

None
folder_name str | None

Optional name prefix for temp directory

None
sparse bool

Enable sparse checkout mode to avoid populating all files initially

True
shallow bool

Enable shallow clone (depth=1) to fetch only the target commit

True
blob_filter bool

Enable blob filtering (--filter=blob:none) to fetch file contents on-demand

True
remove_existing bool

Remove existing directory if it exists

True

Returns:

Type Description
str

Path to the cloned repository

Raises:

Type Description
RuntimeError

If Git version check fails

GitCommandError

If clone operation fails

Source code in src/lampe/core/tools/repository/management.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def clone_repo(
    repo_url: str,
    head_ref: str | None = None,
    base_ref: str | None = None,
    folder_name: str | None = None,
    sparse: bool = True,
    shallow: bool = True,
    blob_filter: bool = True,
    remove_existing: bool = True,
) -> str:
    """Clone a repository optimized for PR review.

    Uses partial clone optimizations including shallow clone, sparse checkout, and blob filtering
    to efficiently fetch only required content.

    Parameters
    ----------
    repo_url
        Repository URL to clone
    head_ref
        Head ref to checkout
    base_ref
        Base ref to fetch for diff computation
    folder_name
        Optional name prefix for temp directory
    sparse
        Enable sparse checkout mode to avoid populating all files initially
    shallow
        Enable shallow clone (depth=1) to fetch only the target commit
    blob_filter
        Enable blob filtering (--filter=blob:none) to fetch file contents on-demand
    remove_existing
        Remove existing directory if it exists

    Returns
    -------
    :
        Path to the cloned repository

    Raises
    ------
    RuntimeError
        If Git version check fails
    GitCommandError
        If clone operation fails
    """
    if not valid_git_version_available():
        raise RuntimeError("Git version check failed. Please upgrade Git to the minimum required version.")

    tmp_dir = f"/tmp/{folder_name}" if folder_name else mkdtemp(prefix=str(uuid.uuid4()))
    logger.info(f"Cloning repo (sparse={sparse}, shallow={shallow}, blob_filter={blob_filter}) to {tmp_dir}")

    if os.path.exists(tmp_dir):
        if remove_existing:
            logger.info(f"Removing existing directory {tmp_dir}")
            shutil.rmtree(tmp_dir)
        else:
            return tmp_dir

    clone_args = []
    if shallow:
        clone_args.extend(["--depth", "1"])
    if sparse:
        clone_args.append("--sparse")
    if blob_filter:
        clone_args.extend(["--filter", "blob:none"])
    if head_ref:
        clone_args.extend(["--revision", head_ref])

    try:
        repository_path = ""
        repo = Repo.clone_from(repo_url, tmp_dir, multi_options=clone_args)
        repository_path = _repo_to_path(repo)
        if sparse and blob_filter:
            logger.info("Partial clone ready - file contents will be fetched on-demand during git operations")
        if base_ref:
            fetch_commit_ref(repository_path, base_ref)
    except GitCommandError as e:
        logger.exception(f"Clone failed: {e}\nClone arguments used: {clone_args}")
        raise e

    return repository_path
fetch_commit_ref(repo_path: str, commit_ref: str) -> None

Fetch a base reference from the remote repository.

Parameters:

Name Type Description Default
repo_path str

Path to the git repository

required
commit_ref str

Commit reference to fetch (e.g., branch name, commit hash)

required

Raises:

Type Description
GitCommandError

If the fetch operation fails

Source code in src/lampe/core/tools/repository/management.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def fetch_commit_ref(repo_path: str, commit_ref: str) -> None:
    """Fetch a base reference from the remote repository.

    Parameters
    ----------
    repo_path
        Path to the git repository
    commit_ref
        Commit reference to fetch (e.g., branch name, commit hash)

    Raises
    ------
    GitCommandError
        If the fetch operation fails
    """
    repo = Repo(path=repo_path)

    repo.git.fetch("--no-tags", "--depth=1", "--filter=blob:none", "origin", commit_ref)
is_sparse_clone(repo_path: str) -> bool

Check if a repository is a sparse clone.

A sparse clone is detected by checking multiple indicators: 1. If core.sparseCheckout is enabled 2. If .git/info/sparse-checkout file exists and has content

Parameters:

Name Type Description Default
repo_path str

Path to the git repository

required

Returns:

Type Description
bool

True if the repository appears to be a sparse clone, False otherwise

Raises:

Type Description
GitCommandError

If git commands fail

Source code in src/lampe/core/tools/repository/management.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def is_sparse_clone(repo_path: str) -> bool:
    """Check if a repository is a sparse clone.

    A sparse clone is detected by checking multiple indicators:
    1. If core.sparseCheckout is enabled
    2. If .git/info/sparse-checkout file exists and has content

    Parameters
    ----------
    repo_path
        Path to the git repository

    Returns
    -------
    bool
        True if the repository appears to be a sparse clone, False otherwise

    Raises
    ------
    GitCommandError
        If git commands fail
    """
    try:
        repo = Repo(path=repo_path)

        # Check if sparse checkout is enabled
        try:
            sparse_checkout = repo.git.config("core.sparseCheckout")
            if sparse_checkout.strip().lower() == "true":
                logger.debug(f"Sparse checkout enabled in {repo_path}")
                return True
        except GitCommandError:
            # core.sparseCheckout not set, continue with other checks
            pass

        # Check if .git/info/sparse-checkout file exists and has content
        sparse_checkout_file = Path(repo_path) / ".git" / "info" / "sparse-checkout"
        if sparse_checkout_file.exists():
            with open(sparse_checkout_file, "r") as f:
                content = f.read().strip()
                if content:
                    logger.debug(f"Sparse checkout file found with content in {repo_path}")
                    return True

        logger.debug(f"No sparse clone indicators found in {repo_path}")
        return False

    except Exception as e:
        logger.exception(f"Error checking if repository is sparse clone: {e}")
        return False
search
find_files_by_pattern(pattern: str, repo_path: str = '/tmp/') -> str

Search for files using git ls-files and pattern matching.

Parameters:

Name Type Description Default
pattern str

Pattern to search for (e.g. ".py", "src//.md")

required
repo_path str

Path to git repository

'/tmp/'

Returns:

Type Description
str

Formatted string containing matching file paths

Source code in src/lampe/core/tools/repository/search.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def find_files_by_pattern(pattern: str, repo_path: str = "/tmp/") -> str:
    """Search for files using git ls-files and pattern matching.

    Parameters
    ----------
    pattern
        Pattern to search for (e.g. "*.py", "src/**/*.md")
    repo_path
        Path to git repository

    Returns
    -------
    str
        Formatted string containing matching file paths
    """
    repo = Repo(path=repo_path)
    try:
        # Filter files matching pattern using git's pathspec matching
        ls_output = repo.git.ls_files("--", pattern)
        ls_output = sanitize_utf8(ls_output)
        matching = ls_output.splitlines()

        if not matching:
            return "No files found"

        return f"```shell\n{'\n'.join(matching)}\n```"

    except GitCommandError as e:
        logger.exception(f"Error finding files: {e}")
        return f"Error: {str(e)}"
search_in_files(pattern: str, relative_dir_path: str, commit_reference: str, include_line_numbers: bool = False, repo_path: str = '/tmp/') -> str

Search for a pattern in files within a directory at a specific commit.

Parameters:

Name Type Description Default
pattern str

Pattern to search for

required
relative_dir_path str

Directory path to search in

required
commit_reference str

Commit reference to search at

required
include_line_numbers bool

Whether to include line numbers in search results (default: False)

False
repo_path str

Path to the git repository, by default "/tmp/"

'/tmp/'

Returns:

Type Description
str

Search results as a string

Source code in src/lampe/core/tools/repository/search.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def search_in_files(
    pattern: str,
    relative_dir_path: str,
    commit_reference: str,
    include_line_numbers: bool = False,
    repo_path: str = "/tmp/",
) -> str:
    """Search for a pattern in files within a directory at a specific commit.

    Parameters
    ----------
    pattern
        Pattern to search for
    relative_dir_path
        Directory path to search in
    commit_reference
        Commit reference to search at
    include_line_numbers
        Whether to include line numbers in search results (default: False)
    repo_path
        Path to the git repository, by default "/tmp/"

    Returns
    -------
    str
        Search results as a string
    """
    try:
        repo = Repo(path=repo_path)
        normalized = (relative_dir_path or "").strip().rstrip("/") or "."
        commit_reference_path = commit_reference if normalized == "." else f"{commit_reference}:{normalized}"
        if include_line_numbers:
            grep_output = repo.git.grep("-n", pattern, commit_reference_path)
        else:
            grep_output = repo.git.grep(pattern, commit_reference_path)
        if grep_output:
            grep_output = sanitize_utf8(grep_output)
            return f"```grep\n{grep_output}\n```"
        return "No matches found"
    except GitCommandError as e:
        # Exit 1 = no match (git grep documented behavior)
        if e.status == 1:
            return "No matches found"
        # 128 and other codes = real errors (invalid ref, path issues, etc.)
        return f"Error executing git grep: {str(e)}"

utils

truncate_to_token_limit(content: str, max_tokens: int) -> str

Truncate the content to the maximum number of tokens. If the content is too long, truncate it to 200000 characters (3-4 characters per token) before encoding for performance reasons. We allow endoftext token to be encoded, since in the past we encountered issues with the tokenizer.

Args: content (str): The content to truncate. max_tokens (int): The maximum number of tokens to keep.

Returns: str: The truncated content.

Source code in src/lampe/core/utils/token.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def truncate_to_token_limit(content: str, max_tokens: int) -> str:
    """Truncate the content to the maximum number of tokens.
    If the content is too long, truncate it to 200000 characters (3-4 characters per token)
    before encoding for performance reasons.
    We allow `endoftext` token to be encoded, since in the past we encountered issues with the tokenizer.

    Args:
        content (str): The content to truncate.
        max_tokens (int): The maximum number of tokens to keep.

    Returns:
        str: The truncated content.
    """
    if max_tokens <= 0:
        raise ValueError("max_tokens must be a positive integer")
    if len(content) >= CHARACTER_TRUNCATION_THRESHOLD:
        logger.warning(
            f"Truncating content to {CHARACTER_TRUNCATION_THRESHOLD} characters before encoding "
            f"for performance reasons. Content length: {len(content)}"
        )
        content = safe_truncate(content, CHARACTER_TRUNCATION_THRESHOLD)
    tokens = encoder.encode(
        content,
        disallowed_special=(),
    )
    truncated = encoder.decode(tokens[:max_tokens])
    return truncated
token
truncate_to_token_limit(content: str, max_tokens: int) -> str

Truncate the content to the maximum number of tokens. If the content is too long, truncate it to 200000 characters (3-4 characters per token) before encoding for performance reasons. We allow endoftext token to be encoded, since in the past we encountered issues with the tokenizer.

Args: content (str): The content to truncate. max_tokens (int): The maximum number of tokens to keep.

Returns: str: The truncated content.

Source code in src/lampe/core/utils/token.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def truncate_to_token_limit(content: str, max_tokens: int) -> str:
    """Truncate the content to the maximum number of tokens.
    If the content is too long, truncate it to 200000 characters (3-4 characters per token)
    before encoding for performance reasons.
    We allow `endoftext` token to be encoded, since in the past we encountered issues with the tokenizer.

    Args:
        content (str): The content to truncate.
        max_tokens (int): The maximum number of tokens to keep.

    Returns:
        str: The truncated content.
    """
    if max_tokens <= 0:
        raise ValueError("max_tokens must be a positive integer")
    if len(content) >= CHARACTER_TRUNCATION_THRESHOLD:
        logger.warning(
            f"Truncating content to {CHARACTER_TRUNCATION_THRESHOLD} characters before encoding "
            f"for performance reasons. Content length: {len(content)}"
        )
        content = safe_truncate(content, CHARACTER_TRUNCATION_THRESHOLD)
    tokens = encoder.encode(
        content,
        disallowed_special=(),
    )
    truncated = encoder.decode(tokens[:max_tokens])
    return truncated

describe

PRDescriptionWorkflow(truncation_tokens=MAX_TOKENS, *args, **kwargs)

Bases: Workflow

A workflow that generates a PR description.

Based on the pull request's diff generate a clear, concise description explaining what are the changes being made and why.

Parameters:

Name Type Description Default
truncation_tokens

Maximum number of tokens to use for the diff content, by default MAX_TOKENS

MAX_TOKENS
Source code in packages/lampe-describe/src/lampe/describe/workflows/pr_description/generation.py
47
48
49
50
51
def __init__(self, truncation_tokens=MAX_TOKENS, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.llm = LiteLLM(model=get_model("LAMPE_MODEL_DESCRIBE", MODELS.GPT_5_NANO_2025_08_07), temperature=1.0)
    self.truncation_tokens = truncation_tokens
    self.output_parser = MarkdownCodeBlockRemoverOutputParser()
generate_description(ev: PRDescriptionPromptEvent) -> StopEvent async

Generate a PR description.

This step generates a PR description using the LLM. It uses the truncated diff of all the changes between 2 commits.

Parameters:

Name Type Description Default
ev PRDescriptionPromptEvent

The prompt event containing the prepared diff and prompt.

required

Returns:

Type Description
StopEvent

The stop event containing the generated description.

Source code in packages/lampe-describe/src/lampe/describe/workflows/pr_description/generation.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@step
async def generate_description(self, ev: PRDescriptionPromptEvent) -> StopEvent:
    """Generate a PR description.

    This step generates a PR description using the LLM.
    It uses the truncated diff of all the changes between 2 commits.

    Parameters
    ----------
    ev
        The prompt event containing the prepared diff and prompt.

    Returns
    -------
    :
        The stop event containing the generated description.
    """
    response = await self.llm.achat(
        messages=[
            ChatMessage(role=MessageRole.SYSTEM, content=SYSTEM_PR_DESCRIPTION_MESSAGE),
            ChatMessage(role=MessageRole.USER, content=ev.formatted_prompt),
        ]
    )

    description = self.output_parser.parse(response.message.content or "")
    return StopEvent(result=PRDescriptionOutput(description=description))
prepare_diff_and_prompt(ev: PRDescriptionStartEvent) -> PRDescriptionPromptEvent async

Prepare the diff and prompt for the LLM.

This step prepares the diff and prompt for the LLM. It truncates the diff to the maximum number of tokens and formats the prompt. The diff is filtered using files_exclude_patterns, files_include_patterns and files_reinclude_patterns. The files_reinclude_patterns allow overriding files_exclude_patterns, which is useful for patterns like "!readme.txt" that should override "*.txt" exclusions.

Parameters:

Name Type Description Default
ev PRDescriptionStartEvent

The start event containing the PR details.

required

Returns:

Type Description
PRDescriptionPromptEvent

The prompt event containing the prepared diff and prompt.

Source code in packages/lampe-describe/src/lampe/describe/workflows/pr_description/generation.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@step
async def prepare_diff_and_prompt(self, ev: PRDescriptionStartEvent) -> PRDescriptionPromptEvent:
    """Prepare the diff and prompt for the LLM.

    This step prepares the diff and prompt for the LLM.
    It truncates the diff to the maximum number of tokens and formats the prompt.
    The diff is filtered using files_exclude_patterns, files_include_patterns and files_reinclude_patterns.
    The files_reinclude_patterns allow overriding files_exclude_patterns, which is useful for patterns like
    "!readme.txt" that should override "*.txt" exclusions.

    Parameters
    ----------
    ev
        The start event containing the PR details.

    Returns
    -------
    :
        The prompt event containing the prepared diff and prompt.
    """
    repo_path = ev.repository.local_path
    base_hash = ev.pull_request.base_commit_hash
    head_hash = ev.pull_request.head_commit_hash
    diff = get_diff_between_commits(
        base_hash, head_hash, files_exclude_patterns=ev.files_exclude_patterns, repo_path=repo_path
    )
    diff = truncate_to_token_limit(diff, self.truncation_tokens)
    formatted_prompt = USER_PR_DESCRIPTION_MESSAGE.format(
        pr_title=ev.pr_title,
        pull_request_diff=diff,
    )
    return PRDescriptionPromptEvent(formatted_prompt=formatted_prompt)

generate_pr_description(repository: Repository, pull_request: PullRequest, files_exclude_patterns: list[str] | None = None, files_reinclude_patterns: list[str] | None = None, truncation_tokens: int = MAX_TOKENS, timeout: int | None = None, verbose: bool = False, metadata: dict | None = None) -> PRDescriptionOutput async

Generate a PR description.

This function generates a PR description for a given pull request. It uses the PRDescriptionWorkflow to generate the description.

Parameters:

Name Type Description Default
repository Repository

The repository to generate the PR description for.

required
pull_request PullRequest

The pull request to generate the PR description for.

required
files_exclude_patterns list[str] | None

The glob matching patterns to exclude from the diff, by default None

None
files_reinclude_patterns list[str] | None

The glob matching patterns to re-include in the diff, by default None

None
truncation_tokens int

The maximum number of tokens to use for the diff content, by default MAX_TOKENS

MAX_TOKENS
timeout int | None

The timeout for the workflow, by default None

None
verbose bool

Whether to print verbose output, by default False

False
metadata dict | None

The metadata to use for the workflow, by default None

None

Returns:

Type Description
PRDescriptionOutput

The output containing the generated description.

Source code in packages/lampe-describe/src/lampe/describe/workflows/pr_description/generation.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
async def generate_pr_description(
    repository: Repository,
    pull_request: PullRequest,
    files_exclude_patterns: list[str] | None = None,
    files_reinclude_patterns: list[str] | None = None,
    truncation_tokens: int = MAX_TOKENS,
    timeout: int | None = None,
    verbose: bool = False,
    metadata: dict | None = None,
) -> PRDescriptionOutput:
    """Generate a PR description.

    This function generates a PR description for a given pull request.
    It uses the PRDescriptionWorkflow to generate the description.

    Parameters
    ----------
    repository
        The repository to generate the PR description for.
    pull_request
        The pull request to generate the PR description for.
    files_exclude_patterns
        The glob matching patterns to exclude from the diff, by default None
    files_reinclude_patterns
        The glob matching patterns to re-include in the diff, by default None
    truncation_tokens
        The maximum number of tokens to use for the diff content, by default MAX_TOKENS
    timeout
        The timeout for the workflow, by default None
    verbose
        Whether to print verbose output, by default False
    metadata
        The metadata to use for the workflow, by default None

    Returns
    -------
    :
        The output containing the generated description.
    """
    if files_exclude_patterns is None:
        files_exclude_patterns = []
    workflow = PRDescriptionWorkflow(truncation_tokens=truncation_tokens, timeout=timeout, verbose=verbose)
    result = await workflow.run(
        start_event=PRDescriptionStartEvent(
            pr_title=pull_request.title,
            repository=repository,
            pull_request=pull_request,
            files_exclude_patterns=files_exclude_patterns,
        )
    )
    return result

workflows

PRDescriptionWorkflow(truncation_tokens=MAX_TOKENS, *args, **kwargs)

Bases: Workflow

A workflow that generates a PR description.

Based on the pull request's diff generate a clear, concise description explaining what are the changes being made and why.

Parameters:

Name Type Description Default
truncation_tokens

Maximum number of tokens to use for the diff content, by default MAX_TOKENS

MAX_TOKENS
Source code in packages/lampe-describe/src/lampe/describe/workflows/pr_description/generation.py
47
48
49
50
51
def __init__(self, truncation_tokens=MAX_TOKENS, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.llm = LiteLLM(model=get_model("LAMPE_MODEL_DESCRIBE", MODELS.GPT_5_NANO_2025_08_07), temperature=1.0)
    self.truncation_tokens = truncation_tokens
    self.output_parser = MarkdownCodeBlockRemoverOutputParser()
generate_description(ev: PRDescriptionPromptEvent) -> StopEvent async

Generate a PR description.

This step generates a PR description using the LLM. It uses the truncated diff of all the changes between 2 commits.

Parameters:

Name Type Description Default
ev PRDescriptionPromptEvent

The prompt event containing the prepared diff and prompt.

required

Returns:

Type Description
StopEvent

The stop event containing the generated description.

Source code in packages/lampe-describe/src/lampe/describe/workflows/pr_description/generation.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@step
async def generate_description(self, ev: PRDescriptionPromptEvent) -> StopEvent:
    """Generate a PR description.

    This step generates a PR description using the LLM.
    It uses the truncated diff of all the changes between 2 commits.

    Parameters
    ----------
    ev
        The prompt event containing the prepared diff and prompt.

    Returns
    -------
    :
        The stop event containing the generated description.
    """
    response = await self.llm.achat(
        messages=[
            ChatMessage(role=MessageRole.SYSTEM, content=SYSTEM_PR_DESCRIPTION_MESSAGE),
            ChatMessage(role=MessageRole.USER, content=ev.formatted_prompt),
        ]
    )

    description = self.output_parser.parse(response.message.content or "")
    return StopEvent(result=PRDescriptionOutput(description=description))
prepare_diff_and_prompt(ev: PRDescriptionStartEvent) -> PRDescriptionPromptEvent async

Prepare the diff and prompt for the LLM.

This step prepares the diff and prompt for the LLM. It truncates the diff to the maximum number of tokens and formats the prompt. The diff is filtered using files_exclude_patterns, files_include_patterns and files_reinclude_patterns. The files_reinclude_patterns allow overriding files_exclude_patterns, which is useful for patterns like "!readme.txt" that should override "*.txt" exclusions.

Parameters:

Name Type Description Default
ev PRDescriptionStartEvent

The start event containing the PR details.

required

Returns:

Type Description
PRDescriptionPromptEvent

The prompt event containing the prepared diff and prompt.

Source code in packages/lampe-describe/src/lampe/describe/workflows/pr_description/generation.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@step
async def prepare_diff_and_prompt(self, ev: PRDescriptionStartEvent) -> PRDescriptionPromptEvent:
    """Prepare the diff and prompt for the LLM.

    This step prepares the diff and prompt for the LLM.
    It truncates the diff to the maximum number of tokens and formats the prompt.
    The diff is filtered using files_exclude_patterns, files_include_patterns and files_reinclude_patterns.
    The files_reinclude_patterns allow overriding files_exclude_patterns, which is useful for patterns like
    "!readme.txt" that should override "*.txt" exclusions.

    Parameters
    ----------
    ev
        The start event containing the PR details.

    Returns
    -------
    :
        The prompt event containing the prepared diff and prompt.
    """
    repo_path = ev.repository.local_path
    base_hash = ev.pull_request.base_commit_hash
    head_hash = ev.pull_request.head_commit_hash
    diff = get_diff_between_commits(
        base_hash, head_hash, files_exclude_patterns=ev.files_exclude_patterns, repo_path=repo_path
    )
    diff = truncate_to_token_limit(diff, self.truncation_tokens)
    formatted_prompt = USER_PR_DESCRIPTION_MESSAGE.format(
        pr_title=ev.pr_title,
        pull_request_diff=diff,
    )
    return PRDescriptionPromptEvent(formatted_prompt=formatted_prompt)
pr_description
PRDescriptionWorkflow(truncation_tokens=MAX_TOKENS, *args, **kwargs)

Bases: Workflow

A workflow that generates a PR description.

Based on the pull request's diff generate a clear, concise description explaining what are the changes being made and why.

Parameters:

Name Type Description Default
truncation_tokens

Maximum number of tokens to use for the diff content, by default MAX_TOKENS

MAX_TOKENS
Source code in packages/lampe-describe/src/lampe/describe/workflows/pr_description/generation.py
47
48
49
50
51
def __init__(self, truncation_tokens=MAX_TOKENS, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.llm = LiteLLM(model=get_model("LAMPE_MODEL_DESCRIBE", MODELS.GPT_5_NANO_2025_08_07), temperature=1.0)
    self.truncation_tokens = truncation_tokens
    self.output_parser = MarkdownCodeBlockRemoverOutputParser()
generate_description(ev: PRDescriptionPromptEvent) -> StopEvent async

Generate a PR description.

This step generates a PR description using the LLM. It uses the truncated diff of all the changes between 2 commits.

Parameters:

Name Type Description Default
ev PRDescriptionPromptEvent

The prompt event containing the prepared diff and prompt.

required

Returns:

Type Description
StopEvent

The stop event containing the generated description.

Source code in packages/lampe-describe/src/lampe/describe/workflows/pr_description/generation.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@step
async def generate_description(self, ev: PRDescriptionPromptEvent) -> StopEvent:
    """Generate a PR description.

    This step generates a PR description using the LLM.
    It uses the truncated diff of all the changes between 2 commits.

    Parameters
    ----------
    ev
        The prompt event containing the prepared diff and prompt.

    Returns
    -------
    :
        The stop event containing the generated description.
    """
    response = await self.llm.achat(
        messages=[
            ChatMessage(role=MessageRole.SYSTEM, content=SYSTEM_PR_DESCRIPTION_MESSAGE),
            ChatMessage(role=MessageRole.USER, content=ev.formatted_prompt),
        ]
    )

    description = self.output_parser.parse(response.message.content or "")
    return StopEvent(result=PRDescriptionOutput(description=description))
prepare_diff_and_prompt(ev: PRDescriptionStartEvent) -> PRDescriptionPromptEvent async

Prepare the diff and prompt for the LLM.

This step prepares the diff and prompt for the LLM. It truncates the diff to the maximum number of tokens and formats the prompt. The diff is filtered using files_exclude_patterns, files_include_patterns and files_reinclude_patterns. The files_reinclude_patterns allow overriding files_exclude_patterns, which is useful for patterns like "!readme.txt" that should override "*.txt" exclusions.

Parameters:

Name Type Description Default
ev PRDescriptionStartEvent

The start event containing the PR details.

required

Returns:

Type Description
PRDescriptionPromptEvent

The prompt event containing the prepared diff and prompt.

Source code in packages/lampe-describe/src/lampe/describe/workflows/pr_description/generation.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@step
async def prepare_diff_and_prompt(self, ev: PRDescriptionStartEvent) -> PRDescriptionPromptEvent:
    """Prepare the diff and prompt for the LLM.

    This step prepares the diff and prompt for the LLM.
    It truncates the diff to the maximum number of tokens and formats the prompt.
    The diff is filtered using files_exclude_patterns, files_include_patterns and files_reinclude_patterns.
    The files_reinclude_patterns allow overriding files_exclude_patterns, which is useful for patterns like
    "!readme.txt" that should override "*.txt" exclusions.

    Parameters
    ----------
    ev
        The start event containing the PR details.

    Returns
    -------
    :
        The prompt event containing the prepared diff and prompt.
    """
    repo_path = ev.repository.local_path
    base_hash = ev.pull_request.base_commit_hash
    head_hash = ev.pull_request.head_commit_hash
    diff = get_diff_between_commits(
        base_hash, head_hash, files_exclude_patterns=ev.files_exclude_patterns, repo_path=repo_path
    )
    diff = truncate_to_token_limit(diff, self.truncation_tokens)
    formatted_prompt = USER_PR_DESCRIPTION_MESSAGE.format(
        pr_title=ev.pr_title,
        pull_request_diff=diff,
    )
    return PRDescriptionPromptEvent(formatted_prompt=formatted_prompt)
data_models
PRDescriptionInput

Bases: BaseModel

Input for PR description generation workflow.

generation
PRDescriptionWorkflow(truncation_tokens=MAX_TOKENS, *args, **kwargs)

Bases: Workflow

A workflow that generates a PR description.

Based on the pull request's diff generate a clear, concise description explaining what are the changes being made and why.

Parameters:

Name Type Description Default
truncation_tokens

Maximum number of tokens to use for the diff content, by default MAX_TOKENS

MAX_TOKENS
Source code in packages/lampe-describe/src/lampe/describe/workflows/pr_description/generation.py
47
48
49
50
51
def __init__(self, truncation_tokens=MAX_TOKENS, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.llm = LiteLLM(model=get_model("LAMPE_MODEL_DESCRIBE", MODELS.GPT_5_NANO_2025_08_07), temperature=1.0)
    self.truncation_tokens = truncation_tokens
    self.output_parser = MarkdownCodeBlockRemoverOutputParser()
generate_description(ev: PRDescriptionPromptEvent) -> StopEvent async

Generate a PR description.

This step generates a PR description using the LLM. It uses the truncated diff of all the changes between 2 commits.

Parameters:

Name Type Description Default
ev PRDescriptionPromptEvent

The prompt event containing the prepared diff and prompt.

required

Returns:

Type Description
StopEvent

The stop event containing the generated description.

Source code in packages/lampe-describe/src/lampe/describe/workflows/pr_description/generation.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@step
async def generate_description(self, ev: PRDescriptionPromptEvent) -> StopEvent:
    """Generate a PR description.

    This step generates a PR description using the LLM.
    It uses the truncated diff of all the changes between 2 commits.

    Parameters
    ----------
    ev
        The prompt event containing the prepared diff and prompt.

    Returns
    -------
    :
        The stop event containing the generated description.
    """
    response = await self.llm.achat(
        messages=[
            ChatMessage(role=MessageRole.SYSTEM, content=SYSTEM_PR_DESCRIPTION_MESSAGE),
            ChatMessage(role=MessageRole.USER, content=ev.formatted_prompt),
        ]
    )

    description = self.output_parser.parse(response.message.content or "")
    return StopEvent(result=PRDescriptionOutput(description=description))
prepare_diff_and_prompt(ev: PRDescriptionStartEvent) -> PRDescriptionPromptEvent async

Prepare the diff and prompt for the LLM.

This step prepares the diff and prompt for the LLM. It truncates the diff to the maximum number of tokens and formats the prompt. The diff is filtered using files_exclude_patterns, files_include_patterns and files_reinclude_patterns. The files_reinclude_patterns allow overriding files_exclude_patterns, which is useful for patterns like "!readme.txt" that should override "*.txt" exclusions.

Parameters:

Name Type Description Default
ev PRDescriptionStartEvent

The start event containing the PR details.

required

Returns:

Type Description
PRDescriptionPromptEvent

The prompt event containing the prepared diff and prompt.

Source code in packages/lampe-describe/src/lampe/describe/workflows/pr_description/generation.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@step
async def prepare_diff_and_prompt(self, ev: PRDescriptionStartEvent) -> PRDescriptionPromptEvent:
    """Prepare the diff and prompt for the LLM.

    This step prepares the diff and prompt for the LLM.
    It truncates the diff to the maximum number of tokens and formats the prompt.
    The diff is filtered using files_exclude_patterns, files_include_patterns and files_reinclude_patterns.
    The files_reinclude_patterns allow overriding files_exclude_patterns, which is useful for patterns like
    "!readme.txt" that should override "*.txt" exclusions.

    Parameters
    ----------
    ev
        The start event containing the PR details.

    Returns
    -------
    :
        The prompt event containing the prepared diff and prompt.
    """
    repo_path = ev.repository.local_path
    base_hash = ev.pull_request.base_commit_hash
    head_hash = ev.pull_request.head_commit_hash
    diff = get_diff_between_commits(
        base_hash, head_hash, files_exclude_patterns=ev.files_exclude_patterns, repo_path=repo_path
    )
    diff = truncate_to_token_limit(diff, self.truncation_tokens)
    formatted_prompt = USER_PR_DESCRIPTION_MESSAGE.format(
        pr_title=ev.pr_title,
        pull_request_diff=diff,
    )
    return PRDescriptionPromptEvent(formatted_prompt=formatted_prompt)
generate_pr_description(repository: Repository, pull_request: PullRequest, files_exclude_patterns: list[str] | None = None, files_reinclude_patterns: list[str] | None = None, truncation_tokens: int = MAX_TOKENS, timeout: int | None = None, verbose: bool = False, metadata: dict | None = None) -> PRDescriptionOutput async

Generate a PR description.

This function generates a PR description for a given pull request. It uses the PRDescriptionWorkflow to generate the description.

Parameters:

Name Type Description Default
repository Repository

The repository to generate the PR description for.

required
pull_request PullRequest

The pull request to generate the PR description for.

required
files_exclude_patterns list[str] | None

The glob matching patterns to exclude from the diff, by default None

None
files_reinclude_patterns list[str] | None

The glob matching patterns to re-include in the diff, by default None

None
truncation_tokens int

The maximum number of tokens to use for the diff content, by default MAX_TOKENS

MAX_TOKENS
timeout int | None

The timeout for the workflow, by default None

None
verbose bool

Whether to print verbose output, by default False

False
metadata dict | None

The metadata to use for the workflow, by default None

None

Returns:

Type Description
PRDescriptionOutput

The output containing the generated description.

Source code in packages/lampe-describe/src/lampe/describe/workflows/pr_description/generation.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
async def generate_pr_description(
    repository: Repository,
    pull_request: PullRequest,
    files_exclude_patterns: list[str] | None = None,
    files_reinclude_patterns: list[str] | None = None,
    truncation_tokens: int = MAX_TOKENS,
    timeout: int | None = None,
    verbose: bool = False,
    metadata: dict | None = None,
) -> PRDescriptionOutput:
    """Generate a PR description.

    This function generates a PR description for a given pull request.
    It uses the PRDescriptionWorkflow to generate the description.

    Parameters
    ----------
    repository
        The repository to generate the PR description for.
    pull_request
        The pull request to generate the PR description for.
    files_exclude_patterns
        The glob matching patterns to exclude from the diff, by default None
    files_reinclude_patterns
        The glob matching patterns to re-include in the diff, by default None
    truncation_tokens
        The maximum number of tokens to use for the diff content, by default MAX_TOKENS
    timeout
        The timeout for the workflow, by default None
    verbose
        Whether to print verbose output, by default False
    metadata
        The metadata to use for the workflow, by default None

    Returns
    -------
    :
        The output containing the generated description.
    """
    if files_exclude_patterns is None:
        files_exclude_patterns = []
    workflow = PRDescriptionWorkflow(truncation_tokens=truncation_tokens, timeout=timeout, verbose=verbose)
    result = await workflow.run(
        start_event=PRDescriptionStartEvent(
            pr_title=pull_request.title,
            repository=repository,
            pull_request=pull_request,
            files_exclude_patterns=files_exclude_patterns,
        )
    )
    return result

review

AgenticReviewComplete

Bases: StopEvent

Complete event for agentic review workflow.

AgenticReviewStart

Bases: StartEvent

Start event for agentic review workflow.

generate_agentic_pr_review(repository: Repository, pull_request: PullRequest, review_depth: ReviewDepth = ReviewDepth.STANDARD, custom_guidelines: list[str] | None = None, files_exclude_patterns: list[str] | None = None, timeout: int | None = None, verbose: bool = False) -> AgenticReviewComplete async

Generate a PR review using the agentic orchestrator workflow.

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/agentic_review_workflow.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
async def generate_agentic_pr_review(
    repository: Repository,
    pull_request: PullRequest,
    review_depth: ReviewDepth = ReviewDepth.STANDARD,
    custom_guidelines: list[str] | None = None,
    files_exclude_patterns: list[str] | None = None,
    timeout: int | None = None,
    verbose: bool = False,
) -> AgenticReviewComplete:
    """Generate a PR review using the agentic orchestrator workflow."""
    if files_exclude_patterns is None:
        files_exclude_patterns = []

    input_data = PRReviewInput(
        repository=repository,
        pull_request=pull_request,
        review_depth=review_depth,
        custom_guidelines=custom_guidelines,
        files_exclude_patterns=files_exclude_patterns,
    )
    workflow = AgenticReviewWorkflow(timeout=timeout, verbose=verbose)
    result: AgenticReviewComplete = await workflow.run(start_event=AgenticReviewStart(input=input_data))
    return result

workflows

AgenticReviewComplete

Bases: StopEvent

Complete event for agentic review workflow.

AgenticReviewStart

Bases: StartEvent

Start event for agentic review workflow.

generate_agentic_pr_review(repository: Repository, pull_request: PullRequest, review_depth: ReviewDepth = ReviewDepth.STANDARD, custom_guidelines: list[str] | None = None, files_exclude_patterns: list[str] | None = None, timeout: int | None = None, verbose: bool = False) -> AgenticReviewComplete async

Generate a PR review using the agentic orchestrator workflow.

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/agentic_review_workflow.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
async def generate_agentic_pr_review(
    repository: Repository,
    pull_request: PullRequest,
    review_depth: ReviewDepth = ReviewDepth.STANDARD,
    custom_guidelines: list[str] | None = None,
    files_exclude_patterns: list[str] | None = None,
    timeout: int | None = None,
    verbose: bool = False,
) -> AgenticReviewComplete:
    """Generate a PR review using the agentic orchestrator workflow."""
    if files_exclude_patterns is None:
        files_exclude_patterns = []

    input_data = PRReviewInput(
        repository=repository,
        pull_request=pull_request,
        review_depth=review_depth,
        custom_guidelines=custom_guidelines,
        files_exclude_patterns=files_exclude_patterns,
    )
    workflow = AgenticReviewWorkflow(timeout=timeout, verbose=verbose)
    result: AgenticReviewComplete = await workflow.run(start_event=AgenticReviewStart(input=input_data))
    return result
agentic_review

Agentic review workflow with orchestrator, validation agents, and skill selection.

AgenticReviewComplete

Bases: StopEvent

Complete event for agentic review workflow.

AgenticReviewStart

Bases: StartEvent

Start event for agentic review workflow.

generate_agentic_pr_review(repository: Repository, pull_request: PullRequest, review_depth: ReviewDepth = ReviewDepth.STANDARD, custom_guidelines: list[str] | None = None, files_exclude_patterns: list[str] | None = None, timeout: int | None = None, verbose: bool = False) -> AgenticReviewComplete async

Generate a PR review using the agentic orchestrator workflow.

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/agentic_review_workflow.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
async def generate_agentic_pr_review(
    repository: Repository,
    pull_request: PullRequest,
    review_depth: ReviewDepth = ReviewDepth.STANDARD,
    custom_guidelines: list[str] | None = None,
    files_exclude_patterns: list[str] | None = None,
    timeout: int | None = None,
    verbose: bool = False,
) -> AgenticReviewComplete:
    """Generate a PR review using the agentic orchestrator workflow."""
    if files_exclude_patterns is None:
        files_exclude_patterns = []

    input_data = PRReviewInput(
        repository=repository,
        pull_request=pull_request,
        review_depth=review_depth,
        custom_guidelines=custom_guidelines,
        files_exclude_patterns=files_exclude_patterns,
    )
    workflow = AgenticReviewWorkflow(timeout=timeout, verbose=verbose)
    result: AgenticReviewComplete = await workflow.run(start_event=AgenticReviewStart(input=input_data))
    return result
agentic_review_prompt

Prompts for the agentic review orchestrator.

agentic_review_workflow

Main agentic review orchestrator workflow.

AgenticReviewComplete

Bases: StopEvent

Complete event for agentic review workflow.

AgenticReviewStart

Bases: StartEvent

Start event for agentic review workflow.

AgenticReviewWorkflow(timeout: int | None = None, verbose: bool = False, *args: Any, **kwargs: Any)

Bases: Workflow

Orchestrator workflow for agentic review.

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/agentic_review_workflow.py
149
150
151
152
153
154
155
156
157
158
159
160
def __init__(
    self,
    timeout: int | None = None,
    verbose: bool = False,
    *args: Any,
    **kwargs: Any,
):
    super().__init__(*args, timeout=timeout, verbose=verbose, **kwargs)
    self.verbose = verbose
    self.timeout = timeout
    self.logger = logging.getLogger(LAMPE_LOGGER_NAME)
    self.aggregation_workflow = LLMAggregationWorkflow(timeout=timeout, verbose=verbose)
aggregate_and_deliver(ctx: Context, ev: ValidationsCompleteEvent) -> AgenticReviewComplete async

Convert to AgentReviewOutput, run QA, deliver.

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/agentic_review_workflow.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
@step
async def aggregate_and_deliver(self, ctx: Context, ev: ValidationsCompleteEvent) -> AgenticReviewComplete:
    """Convert to AgentReviewOutput, run QA, deliver."""
    agent_reviews = _validation_results_to_agent_review_output(ev.results)
    if not agent_reviews:
        return AgenticReviewComplete(output=[])

    aggregation_result: LLMAggregationCompleteEvent = await self.aggregation_workflow.run(
        start_event=LLMAggregationStartEvent(
            agent_reviews=agent_reviews,
            files_changed=ev.files_changed,
        )
    )
    return AgenticReviewComplete(output=aggregation_result.aggregated_reviews)
extract_intent_and_select_skills(ctx: Context, ev: AgenticReviewStart) -> TasksPlannedEvent async

Extract PR intent, discover/select skills, plan tasks.

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/agentic_review_workflow.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
@step
async def extract_intent_and_select_skills(self, ctx: Context, ev: AgenticReviewStart) -> TasksPlannedEvent:
    """Extract PR intent, discover/select skills, plan tasks."""
    inp = ev.input
    repo_path = inp.repository.local_path
    base_commit = inp.pull_request.base_commit_hash
    head_commit = inp.pull_request.head_commit_hash

    files_changed = list_changed_files(
        base_reference=base_commit,
        head_reference=head_commit,
        repo_path=repo_path,
    )

    # Intent extraction (FunctionCallingProgram for structured output)
    llm = LiteLLM(model=get_model("LAMPE_MODEL_REVIEW_INTENT", MODELS.GPT_5_2_CODEX), temperature=1)
    intent_prompt = f"{INTENT_EXTRACTION_SYSTEM_PROMPT}\n\n{INTENT_EXTRACTION_USER_PROMPT}"
    try:
        intent_program = FunctionCallingProgram.from_defaults(
            output_cls=PRIntent,
            llm=llm,
            prompt_template_str=intent_prompt,
            tool_required=True,
        )
        pr_intent = await intent_program.acall(
            pr_title=inp.pull_request.title,
            pr_description=inp.pull_request.body or "(no description)",
            files_changed=files_changed,
        )
        if pr_intent is None:
            self.logger.warning("Intent extraction returned None (LLM may not have invoked structured output)")
            pr_intent = PRIntent(
                summary=inp.pull_request.title,
                areas_touched=[],
                suggested_validation_tasks=[],
            )
        elif isinstance(pr_intent, list) and pr_intent:
            pr_intent = pr_intent[0]
        if not isinstance(pr_intent, PRIntent):
            self.logger.warning(f"Intent extraction returned unexpected type: {type(pr_intent)}")
            pr_intent = PRIntent(
                summary=inp.pull_request.title,
                areas_touched=[],
                suggested_validation_tasks=[],
            )
    except Exception as e:
        self.logger.warning(f"Intent extraction failed: {e}", exc_info=True)
        pr_intent = PRIntent(
            summary=inp.pull_request.title,
            areas_touched=[],
            suggested_validation_tasks=[],
        )

    # Skill discovery and selection
    skills = discover_skills(repo_path)
    selected_skills = []
    if skills:
        selected_skills = await select_applicable_skills(
            pr_intent=pr_intent,
            files_changed=files_changed,
            skills=skills,
            llm=llm,
        )

    # Task planning (basic tasks only; skill tasks added separately)
    task_prompt = f"{TASK_PLANNING_SYSTEM_PROMPT}\n\n{TASK_PLANNING_USER_PROMPT}"
    tasks: list[ValidationTask] = []
    try:
        task_program = FunctionCallingProgram.from_defaults(
            output_cls=TaskPlanningOutput,
            llm=llm,
            prompt_template_str=task_prompt,
            tool_required=True,
        )
        task_result = await task_program.acall(
            pr_intent_summary=pr_intent.summary,
            areas_touched=", ".join(pr_intent.areas_touched) or "unknown",
            suggested_tasks="\n".join(f"- {t}" for t in pr_intent.suggested_validation_tasks),
            files_changed=files_changed,
        )
        if task_result is None:
            self.logger.warning("Task planning returned None (LLM may not have invoked structured output)")
        elif isinstance(task_result, list) and task_result:
            task_result = task_result[0]
        if isinstance(task_result, TaskPlanningOutput):
            tasks = task_result.tasks
        else:
            self.logger.warning(
                "Task planning returned unexpected type: %s",
                type(task_result).__name__ if task_result is not None else "None",
            )
    except Exception as e:
        self.logger.warning(f"Task planning failed: {e}", exc_info=True)

    # Add skill tasks for selected skills
    for i, skill in enumerate(selected_skills):
        tasks.append(
            ValidationTask(
                task_id=f"skill-{skill.name}-{i}",
                description=f"Apply {skill.name} guidelines to validate the changes",
                applicable_skill_paths=[skill.path],
                skill_content=skill.content,
            )
        )

    if not tasks:
        self.logger.warning(
            "No tasks produced: task planning returned empty and no skills selected. "
            "Skipping validation (no fallback to basic validation)."
        )

    return TasksPlannedEvent(
        tasks=tasks,
        files_changed=files_changed,
        repo_path=repo_path,
        base_commit=base_commit,
        head_commit=head_commit,
    )
run_validations(ctx: Context, ev: TasksPlannedEvent) -> ValidationsCompleteEvent async

Run all validation agents (parallel).

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/agentic_review_workflow.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
@step
async def run_validations(self, ctx: Context, ev: TasksPlannedEvent) -> ValidationsCompleteEvent:
    """Run all validation agents (parallel)."""
    results: list[ValidationResult] = []

    async def run_one(task: ValidationTask) -> ValidationResult:
        agent_input = ValidationAgentInput(
            task=task,
            repo_path=ev.repo_path,
            base_commit=ev.base_commit,
            head_commit=ev.head_commit,
            files_changed=ev.files_changed,
        )
        if task.skill_content:
            agent = SkillAugmentedValidationAgent(skill_content=task.skill_content)
        else:
            agent = BasicValidationAgent()
        try:
            complete: ValidationAgentComplete = await agent.run(start_event=ValidationAgentStart(input=agent_input))
            return complete.validation_result
        except Exception as e:
            self.logger.exception(f"Validation agent failed for {task.task_id}: {e}")
            return ValidationResult(task_id=task.task_id, findings=[], no_issue=True, sources=[])

    results = await asyncio.gather(*[run_one(t) for t in ev.tasks])
    return ValidationsCompleteEvent(results=list(results), files_changed=ev.files_changed)
IntentExtractedEvent

Bases: Event

Event after intent extraction.

TasksPlannedEvent

Bases: Event

Event after task planning.

ValidationsCompleteEvent

Bases: Event

Event after all validations complete.

generate_agentic_pr_review(repository: Repository, pull_request: PullRequest, review_depth: ReviewDepth = ReviewDepth.STANDARD, custom_guidelines: list[str] | None = None, files_exclude_patterns: list[str] | None = None, timeout: int | None = None, verbose: bool = False) -> AgenticReviewComplete async

Generate a PR review using the agentic orchestrator workflow.

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/agentic_review_workflow.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
async def generate_agentic_pr_review(
    repository: Repository,
    pull_request: PullRequest,
    review_depth: ReviewDepth = ReviewDepth.STANDARD,
    custom_guidelines: list[str] | None = None,
    files_exclude_patterns: list[str] | None = None,
    timeout: int | None = None,
    verbose: bool = False,
) -> AgenticReviewComplete:
    """Generate a PR review using the agentic orchestrator workflow."""
    if files_exclude_patterns is None:
        files_exclude_patterns = []

    input_data = PRReviewInput(
        repository=repository,
        pull_request=pull_request,
        review_depth=review_depth,
        custom_guidelines=custom_guidelines,
        files_exclude_patterns=files_exclude_patterns,
    )
    workflow = AgenticReviewWorkflow(timeout=timeout, verbose=verbose)
    result: AgenticReviewComplete = await workflow.run(start_event=AgenticReviewStart(input=input_data))
    return result
data_models

Data models for the agentic review workflow.

PRIntent

Bases: BaseModel

Extracted intent from the PR.

TaskPlanningOutput

Bases: BaseModel

Structured output from task planning.

ValidationAgentInput

Bases: BaseModel

Input for the validation agent.

ValidationAgentResponseModel

Bases: BaseModel

Pydantic model for validation agent JSON response parsing.

ValidationFinding

Bases: BaseModel

Structured finding from a validation agent.

ValidationResult

Bases: BaseModel

Output from a Validation Agent.

ValidationTask

Bases: BaseModel

Single task to send to a Validation Agent.

response_parse

Utilities for parsing LLM JSON responses with graceful handling of malformed output.

extract_json_from_llm_content(content: str) -> str

Extract JSON from LLM response, supporting markdown code blocks.

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/response_parse.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def extract_json_from_llm_content(content: str) -> str:
    """Extract JSON from LLM response, supporting markdown code blocks."""
    if not content or not content.strip():
        return ""
    stripped = content.strip()

    # Try markdown code block with json language tag
    match = re.search(r"```(?:json)?\s*\n(.*?)\n```", stripped, re.DOTALL)
    if match:
        return match.group(1).strip()

    # Try generic code block
    match = re.search(r"```\s*\n(.*?)\n```", stripped, re.DOTALL)
    if match:
        return match.group(1).strip()

    return stripped
parse_validation_response(content: str) -> tuple[ValidationAgentResponseModel | None, bool]

Parse LLM content into ValidationAgentResponseModel.

Returns (parsed_model, success). On failure returns (None, False).

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/response_parse.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def parse_validation_response(content: str) -> tuple[ValidationAgentResponseModel | None, bool]:
    """
    Parse LLM content into ValidationAgentResponseModel.

    Returns (parsed_model, success). On failure returns (None, False).
    """
    extracted = extract_json_from_llm_content(content)
    if not extracted:
        return None, False

    # Workaround: some models insert newlines before closing quotes
    normalized = extracted.replace('\n"', '"')

    try:
        parser = PydanticOutputParser(output_cls=ValidationAgentResponseModel)
        parsed = parser.parse(normalized)
        return parsed, True
    except Exception:
        return None, False
run

CLI entry point for running the agentic PR review from JSON input.

main() -> None

Run agentic PR review from a JSON input file. Usage: generate_pr_review .

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/run.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def main() -> None:
    """Run agentic PR review from a JSON input file. Usage: generate_pr_review <input_json_file>."""
    from lampe.core import initialize
    from lampe.core.tools import clone_repo
    from lampe.review.workflows.agentic_review import generate_agentic_pr_review
    from lampe.review.workflows.pr_review.data_models import PRReviewInput

    initialize()

    if len(sys.argv) < 2:
        print("Usage: generate_pr_review <input_json_file>", file=sys.stderr)
        sys.exit(1)

    with open(sys.argv[1]) as f:
        data = json.load(f)

    repo_data = data.get("repository", {})
    if repo_data.get("url"):
        repository_path = clone_repo(
            repo_data["url"],
            head_ref=data.get("pull_request", {}).get("head_commit_hash"),
            base_ref=data.get("pull_request", {}).get("base_commit_hash"),
        )
        data = dict(data)
        data["repository"] = {"local_path": repository_path, "full_name": repo_data.get("full_name")}

    input_model = PRReviewInput.model_validate(data)

    result = asyncio.run(
        generate_agentic_pr_review(
            repository=input_model.repository,
            pull_request=input_model.pull_request,
            review_depth=input_model.review_depth,
            custom_guidelines=input_model.custom_guidelines,
            files_exclude_patterns=input_model.files_exclude_patterns or [],
        )
    )

    for agent_output in result.output:
        print(f"# Agent: {agent_output.agent_name}")
        print(f"**Focus Areas:** {', '.join(agent_output.focus_areas)}")
        print(f"**Global Summary:** {agent_output.summary}")
        print()
        for file_review in agent_output.reviews:
            print(f"## {file_review.file_path}")
            print(file_review.summary)
            for line_num, comment in file_review.line_comments.items():
                print(f"- Line {line_num}: {comment}")
            for c in file_review.structured_comments:
                if not c.muted:
                    print(f"- L{c.line_number} [{c.severity}] {c.comment}")
            print()
skill_selector

Skill selector and discovery for agentic review.

SkillInfo

Bases: BaseModel

Metadata and content of a discovered skill.

discover_skills(repo_path: str) -> list[SkillInfo]

Find all SKILL.md files in the repository.

Scans for any SKILL.md file under the repo root. Skips directories: .git, node_modules, pycache, .venv, venv, .tox, dist, build.

Returns: List of SkillInfo with path, name, description, and full content.

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/skill_selector/skill_discovery.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def discover_skills(repo_path: str) -> list[SkillInfo]:
    """Find all SKILL.md files in the repository.

    Scans for any SKILL.md file under the repo root.
    Skips directories: .git, node_modules, __pycache__, .venv, venv, .tox, dist, build.

    Returns:
        List of SkillInfo with path, name, description, and full content.
    """
    base = Path(repo_path).resolve()
    if not base.exists() or not base.is_dir():
        return []

    skills: list[SkillInfo] = []
    seen_paths: set[Path] = set()

    for skill_file in base.rglob("SKILL.md"):
        if not skill_file.is_file():
            continue
        if skill_file in seen_paths:
            continue
        if _should_skip(skill_file, base):
            continue

        seen_paths.add(skill_file)

        try:
            content = skill_file.read_text(encoding="utf-8")
        except Exception:
            continue

        metadata, _ = _parse_frontmatter(content)
        skill_dir = skill_file.parent
        name = metadata.get("name", skill_dir.name)
        description = metadata.get("description", "")

        skills.append(
            SkillInfo(
                path=str(skill_file),
                name=name,
                description=description,
                content=content,
            )
        )
    return skills
select_applicable_skills(pr_intent: PRIntent, files_changed: str, skills: list[SkillInfo], llm: LiteLLM | None = None) -> list[SkillInfo] async

Select which skills from the repo apply to this PR.

Do not call this when skills is empty - the workflow should skip skill selection.

Args: pr_intent: Extracted PR intent files_changed: List of changed files skills: List of discovered skills (must be non-empty) llm: Optional LLM instance

Returns: List of SkillInfo that apply to this PR

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/skill_selector/skill_selector_agent.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
async def select_applicable_skills(
    pr_intent: PRIntent,
    files_changed: str,
    skills: list[SkillInfo],
    llm: LiteLLM | None = None,
) -> list[SkillInfo]:
    """Select which skills from the repo apply to this PR.

    Do not call this when skills is empty - the workflow should skip skill selection.

    Args:
        pr_intent: Extracted PR intent
        files_changed: List of changed files
        skills: List of discovered skills (must be non-empty)
        llm: Optional LLM instance

    Returns:
        List of SkillInfo that apply to this PR
    """
    if not skills:
        return []

    logger = logging.getLogger(LAMPE_LOGGER_NAME)
    _llm = llm or LiteLLM(model=get_model("LAMPE_MODEL_REVIEW_INTENT", MODELS.GPT_5_2_CODEX), temperature=1)

    skills_list = "\n".join(f'- path: "{s.path}" | name: {s.name} | description: {s.description}' for s in skills)

    prompt_template = f"{SKILL_SELECTOR_SYSTEM_PROMPT}\n\n{SKILL_SELECTOR_USER_PROMPT}"

    try:
        program = FunctionCallingProgram.from_defaults(
            output_cls=SkillSelectionOutput,
            llm=_llm,
            prompt_template_str=prompt_template,
            tool_required=True,
        )
        result = await program.acall(
            pr_intent_summary=pr_intent.summary,
            areas_touched=", ".join(pr_intent.areas_touched) or "unknown",
            suggested_tasks=", ".join(pr_intent.suggested_validation_tasks) or "general review",
            files_changed=files_changed,
            skills_list=skills_list,
        )
        if result is None:
            logger.warning("Skill selector returned None (LLM may not have invoked structured output)")
            selected_paths = []
        elif isinstance(result, SkillSelectionOutput):
            selected_paths = result.selected_skill_paths
        elif isinstance(result, list) and result:
            selected_paths = result[0].selected_skill_paths
        else:
            logger.warning(f"Skill selector returned unexpected type: {type(result)}")
            selected_paths = []
    except Exception as e:
        logger.warning(
            f"Skill selector failed, defaulting to no skills: {e}",
            exc_info=True,
        )
        selected_paths = []

    path_set = set(selected_paths)
    selected = [s for s in skills if s.path in path_set]
    if skills and not selected:
        logger.debug(f"Skill selector chose no skills from {len(skills)} available")
    return selected
skill_discovery

Discover SKILL.md files within the reviewed repository.

SkillInfo

Bases: BaseModel

Metadata and content of a discovered skill.

discover_skills(repo_path: str) -> list[SkillInfo]

Find all SKILL.md files in the repository.

Scans for any SKILL.md file under the repo root. Skips directories: .git, node_modules, pycache, .venv, venv, .tox, dist, build.

Returns: List of SkillInfo with path, name, description, and full content.

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/skill_selector/skill_discovery.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def discover_skills(repo_path: str) -> list[SkillInfo]:
    """Find all SKILL.md files in the repository.

    Scans for any SKILL.md file under the repo root.
    Skips directories: .git, node_modules, __pycache__, .venv, venv, .tox, dist, build.

    Returns:
        List of SkillInfo with path, name, description, and full content.
    """
    base = Path(repo_path).resolve()
    if not base.exists() or not base.is_dir():
        return []

    skills: list[SkillInfo] = []
    seen_paths: set[Path] = set()

    for skill_file in base.rglob("SKILL.md"):
        if not skill_file.is_file():
            continue
        if skill_file in seen_paths:
            continue
        if _should_skip(skill_file, base):
            continue

        seen_paths.add(skill_file)

        try:
            content = skill_file.read_text(encoding="utf-8")
        except Exception:
            continue

        metadata, _ = _parse_frontmatter(content)
        skill_dir = skill_file.parent
        name = metadata.get("name", skill_dir.name)
        description = metadata.get("description", "")

        skills.append(
            SkillInfo(
                path=str(skill_file),
                name=name,
                description=description,
                content=content,
            )
        )
    return skills
skill_selector_agent

Skill Selector Agent - selects which skills apply to a given PR.

SkillSelectionOutput

Bases: BaseModel

Structured output from the Skill Selector Agent.

select_applicable_skills(pr_intent: PRIntent, files_changed: str, skills: list[SkillInfo], llm: LiteLLM | None = None) -> list[SkillInfo] async

Select which skills from the repo apply to this PR.

Do not call this when skills is empty - the workflow should skip skill selection.

Args: pr_intent: Extracted PR intent files_changed: List of changed files skills: List of discovered skills (must be non-empty) llm: Optional LLM instance

Returns: List of SkillInfo that apply to this PR

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/skill_selector/skill_selector_agent.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
async def select_applicable_skills(
    pr_intent: PRIntent,
    files_changed: str,
    skills: list[SkillInfo],
    llm: LiteLLM | None = None,
) -> list[SkillInfo]:
    """Select which skills from the repo apply to this PR.

    Do not call this when skills is empty - the workflow should skip skill selection.

    Args:
        pr_intent: Extracted PR intent
        files_changed: List of changed files
        skills: List of discovered skills (must be non-empty)
        llm: Optional LLM instance

    Returns:
        List of SkillInfo that apply to this PR
    """
    if not skills:
        return []

    logger = logging.getLogger(LAMPE_LOGGER_NAME)
    _llm = llm or LiteLLM(model=get_model("LAMPE_MODEL_REVIEW_INTENT", MODELS.GPT_5_2_CODEX), temperature=1)

    skills_list = "\n".join(f'- path: "{s.path}" | name: {s.name} | description: {s.description}' for s in skills)

    prompt_template = f"{SKILL_SELECTOR_SYSTEM_PROMPT}\n\n{SKILL_SELECTOR_USER_PROMPT}"

    try:
        program = FunctionCallingProgram.from_defaults(
            output_cls=SkillSelectionOutput,
            llm=_llm,
            prompt_template_str=prompt_template,
            tool_required=True,
        )
        result = await program.acall(
            pr_intent_summary=pr_intent.summary,
            areas_touched=", ".join(pr_intent.areas_touched) or "unknown",
            suggested_tasks=", ".join(pr_intent.suggested_validation_tasks) or "general review",
            files_changed=files_changed,
            skills_list=skills_list,
        )
        if result is None:
            logger.warning("Skill selector returned None (LLM may not have invoked structured output)")
            selected_paths = []
        elif isinstance(result, SkillSelectionOutput):
            selected_paths = result.selected_skill_paths
        elif isinstance(result, list) and result:
            selected_paths = result[0].selected_skill_paths
        else:
            logger.warning(f"Skill selector returned unexpected type: {type(result)}")
            selected_paths = []
    except Exception as e:
        logger.warning(
            f"Skill selector failed, defaulting to no skills: {e}",
            exc_info=True,
        )
        selected_paths = []

    path_set = set(selected_paths)
    selected = [s for s in skills if s.path in path_set]
    if skills and not selected:
        logger.debug(f"Skill selector chose no skills from {len(skills)} available")
    return selected
skill_selector_prompt

Prompt for the Skill Selector Agent.

validation

Validation agents for task-based code verification.

BasicValidationAgent(*args, **kwargs)

Bases: ValidationAgent

Validation agent without skill content. Executes orchestrator-formulated tasks.

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/validation/basic_validation_agent.py
15
16
def __init__(self, *args, **kwargs) -> None:
    super().__init__(skill_content="", *args, **kwargs)
SkillAugmentedValidationAgent(skill_content: str, *args, **kwargs)

Bases: ValidationAgent

Validation agent with skill content. The skill defines what to validate.

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/validation/skill_augmented_validation_agent.py
19
20
def __init__(self, skill_content: str, *args, **kwargs) -> None:
    super().__init__(skill_content=skill_content, *args, **kwargs)
basic_validation_agent

Basic validation agent - no skill augmentation.

BasicValidationAgent(*args, **kwargs)

Bases: ValidationAgent

Validation agent without skill content. Executes orchestrator-formulated tasks.

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/validation/basic_validation_agent.py
15
16
def __init__(self, *args, **kwargs) -> None:
    super().__init__(skill_content="", *args, **kwargs)
ValidationAgentComplete

Bases: StopEvent

Stop event for validation agent.

ValidationAgentStart

Bases: StartEvent

Start event for validation agent.

skill_augmented_validation_agent

Skill-augmented validation agent - with domain-specific skill content.

SkillAugmentedValidationAgent(skill_content: str, *args, **kwargs)

Bases: ValidationAgent

Validation agent with skill content. The skill defines what to validate.

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/validation/skill_augmented_validation_agent.py
19
20
def __init__(self, skill_content: str, *args, **kwargs) -> None:
    super().__init__(skill_content=skill_content, *args, **kwargs)
ValidationAgentComplete

Bases: StopEvent

Stop event for validation agent.

ValidationAgentStart

Bases: StartEvent

Start event for validation agent.

validation_agent

Base validation agent - task-based verification.

ValidationAgent(skill_content: str = '', llm: LiteLLM | None = None, *args: Any, **kwargs: Any)

Bases: FunctionCallingAgent

Base validation agent that executes a single verification task.

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/validation/validation_agent.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(self, skill_content: str = "", llm: LiteLLM | None = None, *args: Any, **kwargs: Any) -> None:
    system_prompt = VALIDATION_AGENT_BASE_SYSTEM_PROMPT

    llm = llm or LiteLLM(
        model=get_model("LAMPE_MODEL_REVIEW_VALIDATION", MODELS.GPT_5_1_CODEX_MINI),
        temperature=1.0,
        reasoning_effort="low",
    )
    if skill_content:
        from lampe.review.workflows.agentic_review.validation.validation_agent_prompt import (
            SKILL_CONTENT_SECTION,
        )

        system_prompt += SKILL_CONTENT_SECTION.format(skill_content=skill_content)

    super().__init__(
        *args,
        tools=git_tools_gpt_5_nano_agent_prompt,
        system_prompt=system_prompt,
        llm=llm,
        **kwargs,
    )
    self.logger = logging.getLogger(LAMPE_LOGGER_NAME)
handle_agent_completion(ctx: Context, ev: AgentCompleteEvent) -> ValidationAgentComplete async

Parse agent output into ValidationResult.

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/validation/validation_agent.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@step
async def handle_agent_completion(self, ctx: Context, ev: AgentCompleteEvent) -> ValidationAgentComplete:
    """Parse agent output into ValidationResult."""
    # Get task_id from input - we need to stash it in context
    input_event = await ctx.store.get("validation_input")
    task_id = input_event.task.task_id if input_event else "unknown"

    findings, no_issue = self._parse_response(ev.output or "", ev.sources)
    result = ValidationResult(
        task_id=task_id,
        findings=findings,
        no_issue=no_issue,
        sources=ev.sources,
    )
    return ValidationAgentComplete(validation_result=result)
setup_query_and_tools(ctx: Context, ev: ValidationAgentStart) -> UserInputEvent async

Setup the validation task and tools.

Source code in packages/lampe-review/src/lampe/review/workflows/agentic_review/validation/validation_agent.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@step
async def setup_query_and_tools(self, ctx: Context, ev: ValidationAgentStart) -> UserInputEvent:
    """Setup the validation task and tools."""
    inp = ev.input
    await ctx.store.set("validation_input", inp)

    query = VALIDATION_AGENT_USER_PROMPT.format(
        task_description=inp.task.description,
        repo_path=inp.repo_path,
        base_commit=inp.base_commit,
        head_commit=inp.head_commit,
        files_changed=inp.files_changed,
    )

    self.update_tools(
        partial_params={
            "repo_path": inp.repo_path,
            "base_reference": inp.base_commit,
            "head_reference": inp.head_commit,
            "commit_hash": inp.head_commit,
            "commit_reference": inp.head_commit,
            "include_line_numbers": True,
        }
    )
    return UserInputEvent(input=query)
ValidationAgentComplete

Bases: StopEvent

Stop event for validation agent.

ValidationAgentStart

Bases: StartEvent

Start event for validation agent.

validation_agent_prompt

Prompt for validation agents (task-based verification).

pr_review

Shared data models and LLM aggregation for PR review (used by agentic workflow).

agents

Mute-issue aggregation agent used by the agentic review LLM aggregation step.

MuteIssueAggregationAgent(*args: Any, **kwargs: Any)

Bases: FunctionCallingAgent

Agent that runs mute_issue tool calls and stores results in context.

Source code in packages/lampe-review/src/lampe/review/workflows/pr_review/agents/mute_issue_aggregation_agent.py
50
51
52
53
54
55
56
57
def __init__(self, *args: Any, **kwargs: Any) -> None:
    tools = kwargs.pop("tools", None) or [self._create_mute_issue_tool()]
    super().__init__(
        tools=tools,
        system_prompt=kwargs.pop("system_prompt", MUTE_ISSUE_AGGREGATION_AGENT_SYSTEM_PROMPT),
        *args,
        **kwargs,
    )
setup(ctx: WorkflowContext, ev: MuteIssueStart) -> UserInputEvent async

Convert input to user prompt for the agent.

Source code in packages/lampe-review/src/lampe/review/workflows/pr_review/agents/mute_issue_aggregation_agent.py
59
60
61
62
@step
async def setup(self, ctx: WorkflowContext, ev: MuteIssueStart) -> UserInputEvent:
    """Convert input to user prompt for the agent."""
    return UserInputEvent(input=ev.user_prompt)
MuteIssueStart

Bases: StartEvent

Start event for mute-issue aggregation.

mute_issue_aggregation_agent

Agent that runs mute_issue tool calls for cleaning and deduplicating review comments.

MuteIssueAggregationAgent(*args: Any, **kwargs: Any)

Bases: FunctionCallingAgent

Agent that runs mute_issue tool calls and stores results in context.

Source code in packages/lampe-review/src/lampe/review/workflows/pr_review/agents/mute_issue_aggregation_agent.py
50
51
52
53
54
55
56
57
def __init__(self, *args: Any, **kwargs: Any) -> None:
    tools = kwargs.pop("tools", None) or [self._create_mute_issue_tool()]
    super().__init__(
        tools=tools,
        system_prompt=kwargs.pop("system_prompt", MUTE_ISSUE_AGGREGATION_AGENT_SYSTEM_PROMPT),
        *args,
        **kwargs,
    )
setup(ctx: WorkflowContext, ev: MuteIssueStart) -> UserInputEvent async

Convert input to user prompt for the agent.

Source code in packages/lampe-review/src/lampe/review/workflows/pr_review/agents/mute_issue_aggregation_agent.py
59
60
61
62
@step
async def setup(self, ctx: WorkflowContext, ev: MuteIssueStart) -> UserInputEvent:
    """Convert input to user prompt for the agent."""
    return UserInputEvent(input=ev.user_prompt)
MuteIssueStart

Bases: StartEvent

Start event for mute-issue aggregation.

mute_issue_aggregation_agent_prompt

Prompt for mute-issue aggregation agent that cleans and deduplicates review comments.

data_models
AgentReviewOutput

Bases: BaseModel

Output from individual specialized agents.

to_lightweight_dict() -> dict[str, Any]

Convert to dictionary with lightweight sources for aggregation.

Source code in packages/lampe-review/src/lampe/review/workflows/pr_review/data_models.py
143
144
145
146
147
148
149
150
151
152
153
154
155
def to_lightweight_dict(self) -> dict[str, Any]:
    """Convert to dictionary with lightweight sources for aggregation."""
    lightweight_sources = [
        LightweightToolSource(tool_name=source.tool_name, tool_kwargs=source.tool_kwargs).model_dump()
        for source in self.sources
    ]
    return {
        "agent_name": self.agent_name,
        "focus_areas": self.focus_areas,
        "reviews": [review.model_dump() for review in self.reviews],
        "sources": lightweight_sources,
        "summary": self.summary,
    }
FileReview

Bases: BaseModel

Review for a specific file with inline comments.

IssueWithId

Bases: BaseModel

Single issue with its mute ID, for use in aggregator/hallucination prompts.

build_from_agent_reviews(reviews: list[AgentReviewOutput]) -> list[IssueWithId] classmethod

Build IssueWithId list from agent reviews. Reusable for aggregator and hallucination filter.

Source code in packages/lampe-review/src/lampe/review/workflows/pr_review/data_models.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@classmethod
def build_from_agent_reviews(cls, reviews: list["AgentReviewOutput"]) -> list["IssueWithId"]:
    """Build IssueWithId list from agent reviews. Reusable for aggregator and hallucination filter."""
    issues: list[IssueWithId] = []
    for agent_idx, agent_output in enumerate(reviews):
        for file_idx, file_review in enumerate(agent_output.reviews):
            for comment_idx, rc in enumerate(file_review.structured_comments):
                issues.append(
                    cls(
                        issue_id=f"{agent_idx}|{file_idx}|s|{comment_idx}",
                        agent=agent_output.agent_name,
                        file=file_review.file_path,
                        line=rc.line_number,
                        severity=rc.severity,
                        category=rc.category,
                        comment=rc.comment,
                    )
                )
            for line_num, comment_text in file_review.line_comments.items():
                issues.append(
                    cls(
                        issue_id=f"{agent_idx}|{file_idx}|l|{line_num}",
                        agent=agent_output.agent_name or "unknown",
                        file=file_review.file_path,
                        line=line_num,
                        severity="n/a",
                        category="line_comment",
                        comment=comment_text,
                    )
                )
    return issues
format_list_for_prompt(issues: list[IssueWithId]) -> str staticmethod

Format a list of issues as markdown for the LLM prompt.

Source code in packages/lampe-review/src/lampe/review/workflows/pr_review/data_models.py
126
127
128
129
130
131
@staticmethod
def format_list_for_prompt(issues: list["IssueWithId"]) -> str:
    """Format a list of issues as markdown for the LLM prompt."""
    if not issues:
        return "_No issues to review._"
    return "\n\n".join(issue.to_markdown_block() for issue in issues)
to_markdown_block() -> str

Format this issue as a markdown block for LLM prompts.

Source code in packages/lampe-review/src/lampe/review/workflows/pr_review/data_models.py
82
83
84
85
86
87
88
89
90
91
92
def to_markdown_block(self) -> str:
    """Format this issue as a markdown block for LLM prompts."""
    return ISSUE_BLOCK_TEMPLATE.format(
        issue_id=self.issue_id,
        agent=self.agent,
        file=self.file,
        line=self.line,
        severity=self.severity,
        category=self.category,
        comment=self.comment,
    ).strip()
LightweightToolSource

Bases: BaseModel

Lightweight version of ToolSource without tool_output for aggregation.

PRReivewAggregatorOutput

Bases: BaseModel

Output model for PR review aggregation.

PRReviewInput

Bases: BaseModel

Input for PR review generation workflow.

ReviewComment

Bases: BaseModel

Structured comment with metadata.

ReviewDepth

Bases: str, Enum

Review depth levels for PR reviews.

llm_aggregation_step

LLM-based aggregation workflow for cleaning and deduplicating review comments.

Uses a tool-based approach: the LLM calls mute_issue(issue_id) for each issue to mute. Original reviews are kept with muted flags applied.

LLMAggregationCompleteEvent

Bases: StopEvent

Complete event for LLM aggregation workflow.

LLMAggregationStartEvent

Bases: StartEvent

Start event for LLM aggregation workflow.

LLMAggregationWorkflow(timeout: int | None = None, verbose: bool = False, max_tool_iterations: int = 5, llm: Any | None = None, *args: Any, **kwargs: Any)

Bases: Workflow

Workflow for aggregating and cleaning review comments using LLM tool calls.

The LLM calls mute_issue(issue_id) for each issue to mute. Original reviews are preserved with muted flags applied based on tool calls.

Source code in packages/lampe-review/src/lampe/review/workflows/pr_review/llm_aggregation_step.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def __init__(
    self,
    timeout: int | None = None,
    verbose: bool = False,
    max_tool_iterations: int = 5,
    llm: Any | None = None,
    *args: Any,
    **kwargs: Any,
):
    super().__init__(*args, timeout=timeout, verbose=verbose, **kwargs)
    self.verbose = verbose
    self.logger = logging.getLogger(name=LAMPE_LOGGER_NAME)
    self.llm = llm or LiteLLM(
        model=get_model("LAMPE_MODEL_REVIEW_AGGREGATION", MODELS.GPT_5_2025_08_07),
        temperature=1,
        reasoning_effort="low",
    )
    self.max_tool_iterations = max_tool_iterations
    self._agent = MuteIssueAggregationAgent(
        llm=self.llm,
        max_iterations=self.max_tool_iterations,
        timeout=timeout,
    )
aggregate_reviews(ctx: WorkflowContext, ev: LLMAggregationStartEvent) -> LLMAggregationCompleteEvent async

Aggregate and clean reviews using LLM with mute_issue tool calls.

Source code in packages/lampe-review/src/lampe/review/workflows/pr_review/llm_aggregation_step.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
@step
async def aggregate_reviews(
    self, ctx: WorkflowContext, ev: LLMAggregationStartEvent
) -> LLMAggregationCompleteEvent:
    """Aggregate and clean reviews using LLM with mute_issue tool calls."""
    if not ev.agent_reviews:
        if self.verbose:
            self.logger.debug("No agent reviews to aggregate")
        return LLMAggregationCompleteEvent(aggregated_reviews=[])

    if self.verbose:
        self.logger.debug(f"Aggregating {len(ev.agent_reviews)} agent reviews via mute_issue tool...")

    issues_with_ids = _build_issues_with_ids(ev.agent_reviews)
    tools_used = _format_sources_for_display(ev.agent_reviews)
    user_prompt = MUTE_ISSUE_AGGREGATION_USER_PROMPT.format(
        files_changed=ev.files_changed,
        issues_with_ids=issues_with_ids,
        tools_used=tools_used,
    )

    try:
        agent_ctx = WorkflowContext(self._agent)
        await agent_ctx.store.set("muted_reasons", {})
        await self._agent.run(
            start_event=MuteIssueStart(user_prompt=user_prompt),
            ctx=agent_ctx,
        )
        muted_reasons = await agent_ctx.store.get("muted_reasons", default={})
        muted_reasons = dict(muted_reasons) if muted_reasons else {}
        aggregated_reviews = _apply_muted_flags(ev.agent_reviews, muted_reasons)

        if self.verbose:
            self.logger.debug(f"Aggregation complete: muted {len(muted_reasons)} issues")

    except Exception as e:
        self.logger.exception(f"Failed to aggregate reviews: {e}")
        if self.verbose:
            self.logger.debug("Falling back to original reviews")
        aggregated_reviews = ev.agent_reviews

    if self.verbose:
        self.logger.debug(f"Aggregation complete: {len(aggregated_reviews)} reviews with muted flags")

    return LLMAggregationCompleteEvent(aggregated_reviews=aggregated_reviews)
quick_review

Quick review workflow — context-window-aware, grep-first, Claude 4.5 with extended thinking.

QuickReviewComplete

Bases: StopEvent

Complete event for quick review workflow.

QuickReviewStart

Bases: StartEvent

Start event for quick review workflow.

generate_quick_pr_review(repository: Repository, pull_request: PullRequest, timeout: int | None = None, verbose: bool = False) -> QuickReviewComplete async

Generate a quick PR review using the single-agent quick review workflow.

Source code in packages/lampe-review/src/lampe/review/workflows/quick_review/quick_review_workflow.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
async def generate_quick_pr_review(
    repository: Repository,
    pull_request: PullRequest,
    timeout: int | None = None,
    verbose: bool = False,
) -> QuickReviewComplete:
    """Generate a quick PR review using the single-agent quick review workflow."""
    input_data = PRReviewInput(
        repository=repository,
        pull_request=pull_request,
    )
    workflow = QuickReviewWorkflow(timeout=timeout, verbose=verbose)
    result: QuickReviewComplete = await workflow.run(start_event=QuickReviewStart(input=input_data))
    return result
hallucination_filter_prompt

Prompt for hallucination filter — mutes comments that ask the user to investigate instead of stating verified bugs.

hallucination_filter_step

Hallucination filter step — uses cheap LLM with mute_issue tool to remove investigation-request comments.

HallucinationFilterCompleteEvent

Bases: StopEvent

Complete event for hallucination filter.

HallucinationFilterStartEvent

Bases: StartEvent

Start event for hallucination filter.

HallucinationFilterWorkflow(timeout: int | None = None, verbose: bool = False, max_tool_iterations: int = 5, llm: Any | None = None, *args: Any, **kwargs: Any)

Bases: Workflow

Workflow to mute investigation-request comments using a cheap LLM and mute_issue tool.

Source code in packages/lampe-review/src/lampe/review/workflows/quick_review/hallucination_filter_step.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(
    self,
    timeout: int | None = None,
    verbose: bool = False,
    max_tool_iterations: int = 5,
    llm: Any | None = None,
    *args: Any,
    **kwargs: Any,
):
    super().__init__(*args, timeout=timeout, verbose=verbose, **kwargs)
    self.verbose = verbose
    self.logger = logging.getLogger(name=LAMPE_LOGGER_NAME)
    self.llm = llm or LiteLLM(
        model=get_model("LAMPE_MODEL_QUICK_REVIEW_HALLUCINATION_FILTER", MODELS.GPT_5_NANO_2025_08_07),
        temperature=1,
    )
    self.max_tool_iterations = max_tool_iterations
    self._agent = MuteIssueAggregationAgent(
        llm=self.llm,
        max_iterations=self.max_tool_iterations,
        timeout=timeout,
        system_prompt=HALLUCINATION_FILTER_SYSTEM_PROMPT,
    )
filter_hallucinations(ctx: WorkflowContext, ev: HallucinationFilterStartEvent) -> HallucinationFilterCompleteEvent async

Run hallucination filter: mute comments that ask reader to investigate.

Source code in packages/lampe-review/src/lampe/review/workflows/quick_review/hallucination_filter_step.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@step
async def filter_hallucinations(
    self, ctx: WorkflowContext, ev: HallucinationFilterStartEvent
) -> HallucinationFilterCompleteEvent:
    """Run hallucination filter: mute comments that ask reader to investigate."""
    if not ev.agent_reviews:
        return HallucinationFilterCompleteEvent(filtered_reviews=[])

    # Skip if no findings to filter
    issues_with_ids = _build_issues_with_ids(ev.agent_reviews)
    if "_No issues to review._" in issues_with_ids:
        return HallucinationFilterCompleteEvent(filtered_reviews=ev.agent_reviews)

    if self.verbose:
        self.logger.debug("Running hallucination filter via mute_issue tool...")

    user_prompt = HALLUCINATION_FILTER_USER_PROMPT.format(
        files_changed=ev.files_changed,
        issues_with_ids=issues_with_ids,
    )

    try:
        agent_ctx = WorkflowContext(self._agent)
        await agent_ctx.store.set("muted_reasons", {})
        await self._agent.run(
            start_event=MuteIssueStart(user_prompt=user_prompt),
            ctx=agent_ctx,
        )
        muted_reasons = await agent_ctx.store.get("muted_reasons", default={})
        muted_reasons = dict(muted_reasons) if muted_reasons else {}
        filtered_reviews = _apply_muted_flags(ev.agent_reviews, muted_reasons)

        if self.verbose and muted_reasons:
            self.logger.debug(f"Hallucination filter muted {len(muted_reasons)} issues")

    except Exception as e:
        self.logger.exception(f"Hallucination filter failed: {e}")
        filtered_reviews = ev.agent_reviews

    return HallucinationFilterCompleteEvent(filtered_reviews=filtered_reviews)
quick_review_agent

Quick review agent — context-window-aware, grep-first with env-configurable model.

QuickReviewAgent(llm: LiteLLM | None = None, *args: Any, **kwargs: Any)

Bases: FunctionCallingAgent

Lightweight review agent: grep-first, small reads. Model via LAMPE_MODEL_QUICK_REVIEW.

Source code in packages/lampe-review/src/lampe/review/workflows/quick_review/quick_review_agent.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def __init__(self, llm: LiteLLM | None = None, *args: Any, **kwargs: Any) -> None:
    llm = llm or LiteLLM(
        model=get_model("LAMPE_MODEL_QUICK_REVIEW", MODELS.GPT_5_2025_08_07),
        temperature=1,
        reasoning_effort="medium",
    )
    super().__init__(
        *args,
        tools=quick_review_tools,
        system_prompt=QUICK_REVIEW_AGENT_SYSTEM_PROMPT,
        llm=llm,
        max_iterations=10,
        timeout=None,
        **kwargs,
    )
    self.logger = logging.getLogger(LAMPE_LOGGER_NAME)
handle_agent_completion(ctx: Context, ev: AgentCompleteEvent) -> ValidationAgentComplete async

Parse agent output into ValidationResult.

Source code in packages/lampe-review/src/lampe/review/workflows/quick_review/quick_review_agent.py
 98
 99
100
101
102
103
104
105
106
107
108
@step
async def handle_agent_completion(self, ctx: Context, ev: AgentCompleteEvent) -> ValidationAgentComplete:
    """Parse agent output into ValidationResult."""
    findings, no_issue = self._parse_response(ev.output or "", ev.sources)
    result = ValidationResult(
        task_id="quick-review",
        findings=findings,
        no_issue=no_issue,
        sources=ev.sources,
    )
    return ValidationAgentComplete(validation_result=result)
setup_query_and_tools(ctx: Context, ev: QuickReviewAgentStart) -> UserInputEvent async

Setup the quick review query and pre-fill tool params.

Source code in packages/lampe-review/src/lampe/review/workflows/quick_review/quick_review_agent.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@step
async def setup_query_and_tools(self, ctx: Context, ev: QuickReviewAgentStart) -> UserInputEvent:
    """Setup the quick review query and pre-fill tool params."""
    inp = ev.input
    await ctx.store.set("quick_review_input", inp)

    query = QUICK_REVIEW_AGENT_USER_PROMPT.format(
        repo_path=inp.repo_path,
        base_commit=inp.base_commit,
        head_commit=inp.head_commit,
        files_changed=inp.files_changed,
    )

    self.update_tools(
        partial_params={
            "repo_path": inp.repo_path,
            "base_reference": inp.base_commit,
            "head_reference": inp.head_commit,
            "commit_reference": inp.head_commit,
            "commit_hash": inp.head_commit,
            "include_line_numbers": True,
        }
    )
    return UserInputEvent(input=query)
QuickReviewAgentStart

Bases: StartEvent

Start event for quick review agent.

QuickReviewInput(repo_path: str, base_commit: str, head_commit: str, files_changed: str)

Input for quick review (no task — agent thinks about what to verify).

Source code in packages/lampe-review/src/lampe/review/workflows/quick_review/quick_review_agent.py
34
35
36
37
38
39
40
41
42
43
44
def __init__(
    self,
    repo_path: str,
    base_commit: str,
    head_commit: str,
    files_changed: str,
) -> None:
    self.repo_path = repo_path
    self.base_commit = base_commit
    self.head_commit = head_commit
    self.files_changed = files_changed
quick_review_agent_prompt

Prompt for quick review agent — context-window-aware, single-file diff + targeted investigation.

quick_review_workflow

Quick review workflow — single agent, grep-first, minimal context.

QuickReviewComplete

Bases: StopEvent

Complete event for quick review workflow.

QuickReviewStart

Bases: StartEvent

Start event for quick review workflow.

QuickReviewWorkflow(timeout: int | None = None, verbose: bool = False, *args, **kwargs)

Bases: Workflow

Simple workflow: list files → run quick review agent → convert to AgentReviewOutput.

Source code in packages/lampe-review/src/lampe/review/workflows/quick_review/quick_review_workflow.py
46
47
48
49
50
51
52
53
54
55
def __init__(self, timeout: int | None = None, verbose: bool = False, *args, **kwargs) -> None:
    super().__init__(*args, timeout=timeout, verbose=verbose, **kwargs)
    self.timeout = timeout
    self.verbose = verbose
    self.logger = logging.getLogger("lampe.review.quick_review")
    self.agent = QuickReviewAgent()
    self.hallucination_filter = HallucinationFilterWorkflow(
        timeout=timeout,
        verbose=verbose,
    )
run_quick_review(ctx: Context, ev: QuickReviewStart) -> QuickReviewComplete async

Run the quick review agent on changed files.

Source code in packages/lampe-review/src/lampe/review/workflows/quick_review/quick_review_workflow.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@step
async def run_quick_review(self, ctx: Context, ev: QuickReviewStart) -> QuickReviewComplete:
    """Run the quick review agent on changed files."""
    inp = ev.input
    repo_path = inp.repository.local_path
    base_commit = inp.pull_request.base_commit_hash
    head_commit = inp.pull_request.head_commit_hash

    files_changed = list_changed_files(
        base_reference=base_commit,
        head_reference=head_commit,
        repo_path=repo_path,
    )

    agent_input = QuickReviewInput(
        repo_path=repo_path,
        base_commit=base_commit,
        head_commit=head_commit,
        files_changed=files_changed,
    )

    try:
        complete: ValidationAgentComplete = await self.agent.run(
            start_event=QuickReviewAgentStart(input=agent_input),
        )
    except Exception:
        self.logger.exception("Quick review agent failed")
        return QuickReviewComplete(output=[])

    agent_outputs = _validation_results_to_agent_review_output([complete.validation_result])
    if not agent_outputs:
        return QuickReviewComplete(output=[])

    # Skip hallucination filter when no issues to filter (all reviews empty)
    has_issues = any(fr.structured_comments or fr.line_comments for ao in agent_outputs for fr in ao.reviews)
    if not has_issues:
        return QuickReviewComplete(output=agent_outputs)

    filter_result: HallucinationFilterCompleteEvent = await self.hallucination_filter.run(
        start_event=HallucinationFilterStartEvent(
            agent_reviews=agent_outputs,
            files_changed=files_changed,
        ),
    )
    return QuickReviewComplete(output=filter_result.filtered_reviews)
generate_quick_pr_review(repository: Repository, pull_request: PullRequest, timeout: int | None = None, verbose: bool = False) -> QuickReviewComplete async

Generate a quick PR review using the single-agent quick review workflow.

Source code in packages/lampe-review/src/lampe/review/workflows/quick_review/quick_review_workflow.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
async def generate_quick_pr_review(
    repository: Repository,
    pull_request: PullRequest,
    timeout: int | None = None,
    verbose: bool = False,
) -> QuickReviewComplete:
    """Generate a quick PR review using the single-agent quick review workflow."""
    input_data = PRReviewInput(
        repository=repository,
        pull_request=pull_request,
    )
    workflow = QuickReviewWorkflow(timeout=timeout, verbose=verbose)
    result: QuickReviewComplete = await workflow.run(start_event=QuickReviewStart(input=input_data))
    return result

template

TemplateWorkflow

Bases: Workflow

A template workflow that demonstrates basic workflow structure.

This workflow is responsible for: - Demonstrating the basic workflow pattern - Showing how to handle events - Providing a template for new workflows

template_workflow

TemplateWorkflow

Bases: Workflow

A template workflow that demonstrates basic workflow structure.

This workflow is responsible for: - Demonstrating the basic workflow pattern - Showing how to handle events - Providing a template for new workflows