Developing AI Agents with Microsoft Promptflow: A Complete Guide with Slack Bot Use Cases

READER BEWARE: THE FOLLOWING WRITTEN ENTIRELY BY AI WITHOUT HUMAN EDITING.

Introduction

Building AI agents has become increasingly accessible with the emergence of orchestration frameworks that abstract away the complexity of managing prompts, model interactions, and workflow logic. Microsoft’s Promptflow is one such framework that provides a powerful yet straightforward approach to developing, testing, and deploying AI-powered applications.

In this comprehensive guide, we’ll explore how to use Promptflow to build production-ready AI agents, covering everything from initial setup to live deployment. We’ll walk through two practical use cases—a Slack bot for monitoring channel discussions and a security review bot for open source repositories—and compare Promptflow with alternative frameworks like LangChain and Semantic Kernel.

What is Promptflow?

Promptflow is an open-source development framework from Microsoft designed to streamline the development lifecycle of AI applications powered by Large Language Models (LLMs). It provides tools for:

  • Flow Development: Visual and code-based tools for creating LLM workflows
  • Prompt Engineering: Systematic approach to crafting and refining prompts
  • Testing and Evaluation: Built-in evaluation capabilities for quality assurance
  • Deployment: Integration with Azure AI and other deployment targets
  • Tracing and Debugging: Comprehensive logging and debugging tools

Key Concepts

Flows: The core building block in Promptflow. A flow is a directed acyclic graph (DAG) that connects various nodes including LLM calls, Python functions, and tools.

Nodes: Individual processing units within a flow. Types include:

  • LLM nodes: Calls to language models (OpenAI, Azure OpenAI, etc.)
  • Python nodes: Custom Python code execution
  • Prompt nodes: Prompt template processing

Connections: Secure storage for API keys and endpoints for LLM services.

Runs: Execution instances of flows that capture inputs, outputs, and metrics.

Setup and Installation

Prerequisites

Before getting started, ensure you have:

  • Python 3.9 or later
  • pip package manager
  • An LLM API key (OpenAI, Azure OpenAI, or compatible service)
  • Basic familiarity with Python and command-line tools

Installation

Install Promptflow using pip:

# Install the core Promptflow package
pip install promptflow

# Install additional tools for development
pip install promptflow-tools

# For Azure AI integration
pip install promptflow-azure

# For local development UI (optional but recommended)
pip install promptflow[azure]

Verify Installation

# Check the installation
pf --version

# Expected output:
# promptflow: 1.x.x
# promptflow-core: 1.x.x
# promptflow-devkit: 1.x.x
# promptflow-tracing: 1.x.x

Configure LLM Connection

Set up your LLM connection (using Azure OpenAI as an example):

# Create a connection configuration file
pf connection create --file azure_openai_connection.yaml

Create azure_openai_connection.yaml:

$schema: https://azuremlschemas.azureedge.net/promptflow/latest/AzureOpenAIConnection.schema.json
name: azure_openai_connection
type: azure_open_ai
api_key: "${env:AZURE_OPENAI_API_KEY}"
api_base: "https://your-resource.openai.azure.com/"
api_type: azure
api_version: "2024-02-15-preview"

For OpenAI:

$schema: https://azuremlschemas.azureedge.net/promptflow/latest/OpenAIConnection.schema.json
name: openai_connection
type: open_ai
api_key: "${env:OPENAI_API_KEY}"
organization: "${env:OPENAI_ORG_ID}"  # Optional

Project Structure

A typical Promptflow project structure:

my-promptflow-project/
├── flows/
│   ├── slack-monitor/
│   │   ├── flow.dag.yaml
│   │   ├── analyze_message.py
│   │   ├── classify_relevance.jinja2
│   │   └── requirements.txt
│   └── security-review/
│       ├── flow.dag.yaml
│       ├── run_security_scan.py
│       ├── analyze_results.jinja2
│       └── requirements.txt
├── connections/
│   └── azure_openai_connection.yaml
├── evaluations/
│   └── accuracy_evaluation.py
├── tests/
│   └── test_flows.py
└── pyproject.toml

Potential Costs

Understanding the cost structure is crucial for production deployments. Here’s a comprehensive breakdown:

LLM API Costs

ModelInput (per 1K tokens)Output (per 1K tokens)Notes
GPT-4 Turbo$0.01$0.03Best for complex reasoning
GPT-4o$0.005$0.015Optimized for cost/performance
GPT-4o-mini$0.00015$0.0006Budget option for simpler tasks
GPT-3.5 Turbo$0.0005$0.0015Legacy, still cost-effective

Azure AI Hosting Costs (Optional)

ServiceCostDescription
Azure AI StudioVariesManaged deployment for flows
Azure Container Apps~$0.000012/vCPU-secondServerless container hosting
Azure FunctionsFirst 1M executions freeEvent-driven execution
Azure KubernetesVaries by node sizeFull container orchestration

Estimated Monthly Costs by Use Case

Slack Channel Monitor Bot (Use Case 1):

  • Assuming 10,000 messages/day monitored
  • Average 100 tokens per message analysis
  • Using GPT-4o-mini for classification
  • Estimated: $15-50/month for LLM costs

Security Review Bot (Use Case 2):

  • Assuming 50 repositories reviewed/week
  • Average 5,000 tokens per security analysis
  • Using GPT-4 Turbo for detailed analysis
  • Estimated: $50-200/month for LLM costs

Cost Optimization Strategies

  1. Use Appropriate Models: Use cheaper models (GPT-4o-mini) for classification tasks, reserve expensive models (GPT-4 Turbo) for complex analysis

  2. Implement Caching: Cache responses for repeated queries to reduce API calls

  3. Batch Processing: Combine multiple requests when possible

  4. Token Optimization: Keep prompts concise and use system messages efficiently

  5. Rate Limiting: Implement rate limits to prevent runaway costs

# Example: Cost-aware model selection
def select_model(task_complexity: str) -> str:
    """Select appropriate model based on task complexity."""
    model_mapping = {
        "simple": "gpt-4o-mini",
        "moderate": "gpt-4o",
        "complex": "gpt-4-turbo"
    }
    return model_mapping.get(task_complexity, "gpt-4o-mini")

Iterative Development Workflow

Promptflow encourages an iterative development approach. Here’s a structured workflow:

Phase 1: Prototype Development

Step 1: Define Your Flow

Create flow.dag.yaml:

$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
inputs:
  message:
    type: string
    description: "The input message to analyze"
  context:
    type: string
    description: "Additional context for analysis"
outputs:
  analysis:
    type: string
    reference: ${analyze_message.output}
  classification:
    type: string
    reference: ${classify_relevance.output}
nodes:
  - name: classify_relevance
    type: llm
    source:
      type: code
      path: classify_relevance.jinja2
    inputs:
      deployment_name: gpt-4o-mini
      message: ${inputs.message}
    connection: azure_openai_connection
    api: chat
  
  - name: analyze_message
    type: python
    source:
      type: code
      path: analyze_message.py
    inputs:
      message: ${inputs.message}
      classification: ${classify_relevance.output}

Step 2: Create Prompt Templates

classify_relevance.jinja2:

system:
You are a message classification assistant. Classify the following message based on its relevance to specific stakeholders.

user:
Message: {{ message }}

Classify this message into one of the following categories:
- HIGH_PRIORITY: Urgent matters requiring immediate attention
- RELEVANT: Topics of interest to stakeholders
- INFORMATIONAL: General updates or announcements
- NOT_RELEVANT: Messages outside stakeholder interests

Respond with only the category name and a brief justification.

Step 3: Test Locally

# Run your flow locally
pf flow test --flow ./flows/slack-monitor --inputs message="Urgent: Production server down" context="DevOps team"

# Interactive testing with UI
pf flow serve --source ./flows/slack-monitor --port 8080

Phase 2: Evaluation and Refinement

Step 1: Create Evaluation Dataset

evaluation_data.jsonl:

{"message": "Server is down in production", "expected_classification": "HIGH_PRIORITY"}
{"message": "Team lunch tomorrow at noon", "expected_classification": "NOT_RELEVANT"}
{"message": "New security patch available", "expected_classification": "RELEVANT"}
{"message": "Q4 roadmap discussion scheduled", "expected_classification": "RELEVANT"}

Step 2: Define Evaluation Metrics

evaluations/accuracy_evaluation.py:

from promptflow import tool

@tool
def evaluate_classification(
    prediction: str,
    ground_truth: str
) -> dict:
    """Evaluate classification accuracy."""
    prediction_category = prediction.split(':')[0].strip().upper()
    
    is_correct = prediction_category == ground_truth.upper()
    
    return {
        "is_correct": is_correct,
        "prediction": prediction_category,
        "expected": ground_truth
    }

Step 3: Run Batch Evaluation

# Run evaluation
pf run create \
  --flow ./flows/slack-monitor \
  --data ./evaluation_data.jsonl \
  --column-mapping message='${data.message}' \
  --stream

# View results
pf run show-details --name <run-name>
pf run visualize --name <run-name>

Phase 3: Optimization

Prompt Tuning: Iterate on prompts based on evaluation results

# Experiment tracking
from promptflow import PFClient

client = PFClient()

# Create variants for A/B testing
variants = [
    {"prompt_version": "v1", "system_prompt": "..."},
    {"prompt_version": "v2", "system_prompt": "..."},
]

for variant in variants:
    run = client.runs.create_or_update(
        flow="./flows/slack-monitor",
        data="./evaluation_data.jsonl",
        variant=variant["prompt_version"],
        display_name=f"Evaluation - {variant['prompt_version']}"
    )

Performance Optimization:

# Add caching to reduce API calls
nodes:
  - name: classify_relevance
    type: llm
    source:
      type: code
      path: classify_relevance.jinja2
    inputs:
      deployment_name: gpt-4o-mini
      message: ${inputs.message}
    connection: azure_openai_connection
    api: chat
    cache: true  # Enable caching for repeated inputs

Use Case 1: Slack Channel Monitor Bot

This bot monitors Slack channels for discussions relevant to specific stakeholders and notifies them in real-time.

Architecture Overview

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│  Slack Events   │───▶│  Promptflow Bot  │───▶│  Notifications  │
│  (Messages)     │    │  (Classification)│    │  (DMs/Alerts)   │
└─────────────────┘    └──────────────────┘    └─────────────────┘
         │                      │                       │
         ▼                      ▼                       ▼
   Webhook Listener      LLM Analysis         Slack API (DM)

Implementation

Flow Definition (flow.dag.yaml):

$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
inputs:
  message_text:
    type: string
    description: "The Slack message content"
  channel_name:
    type: string
    description: "The source channel"
  sender:
    type: string
    description: "Message author"
  stakeholder_interests:
    type: object
    description: "Mapping of stakeholders to their interests"
outputs:
  should_notify:
    type: bool
    reference: ${determine_notification.output.should_notify}
  stakeholders:
    type: list
    reference: ${determine_notification.output.stakeholders}
  summary:
    type: string
    reference: ${summarize_discussion.output}
nodes:
  - name: analyze_topic
    type: llm
    source:
      type: code
      path: prompts/analyze_topic.jinja2
    inputs:
      deployment_name: gpt-4o-mini
      message: ${inputs.message_text}
      channel: ${inputs.channel_name}
    connection: azure_openai_connection
    api: chat

  - name: determine_notification
    type: python
    source:
      type: code
      path: tools/determine_notification.py
    inputs:
      topic_analysis: ${analyze_topic.output}
      stakeholder_interests: ${inputs.stakeholder_interests}
      channel: ${inputs.channel_name}

  - name: summarize_discussion
    type: llm
    source:
      type: code
      path: prompts/summarize.jinja2
    inputs:
      deployment_name: gpt-4o-mini
      message: ${inputs.message_text}
      topic: ${analyze_topic.output}
    connection: azure_openai_connection
    api: chat
    activate:
      when: ${determine_notification.output.should_notify}
      is: true

Topic Analysis Prompt (prompts/analyze_topic.jinja2):

system:
You are an expert at analyzing workplace communications. Extract the main topics, technologies mentioned, and the nature of the discussion from Slack messages.

user:
Channel: {{ channel }}
Message: {{ message }}

Analyze this message and provide:
1. Main topic (one phrase)
2. Technologies/products mentioned (comma-separated list)
3. Discussion type: question, announcement, incident, casual, decision, or review
4. Urgency level: high, medium, or low
5. Key entities mentioned (people, teams, systems)

Format your response as JSON:
{
  "topic": "",
  "technologies": [],
  "discussion_type": "",
  "urgency": "",
  "entities": []
}

Notification Logic (tools/determine_notification.py):

import json
from promptflow import tool
from typing import Any

@tool
def determine_notification(
    topic_analysis: str,
    stakeholder_interests: dict,
    channel: str
) -> dict:
    """
    Determine which stakeholders should be notified based on
    their interests and the analyzed message topic.
    """
    try:
        analysis = json.loads(topic_analysis)
    except json.JSONDecodeError:
        return {"should_notify": False, "stakeholders": [], "reason": "Invalid analysis"}
    
    stakeholders_to_notify = []
    
    for stakeholder, interests in stakeholder_interests.items():
        interest_keywords = interests.get("keywords", [])
        interest_channels = interests.get("channels", [])
        interest_discussion_types = interests.get("discussion_types", [])
        urgency_threshold = interests.get("urgency_threshold", "high")
        
        # Check channel match
        channel_match = channel in interest_channels or "*" in interest_channels
        
        # Check topic/technology match
        topic_match = any(
            keyword.lower() in analysis.get("topic", "").lower() or
            keyword.lower() in [t.lower() for t in analysis.get("technologies", [])]
            for keyword in interest_keywords
        )
        
        # Check discussion type match
        discussion_match = (
            analysis.get("discussion_type", "") in interest_discussion_types or
            "*" in interest_discussion_types
        )
        
        # Check urgency
        urgency_levels = {"low": 1, "medium": 2, "high": 3}
        message_urgency = urgency_levels.get(analysis.get("urgency", "low"), 1)
        threshold_urgency = urgency_levels.get(urgency_threshold, 3)
        urgency_match = message_urgency >= threshold_urgency
        
        # Determine if stakeholder should be notified
        if channel_match and (topic_match or discussion_match) and urgency_match:
            stakeholders_to_notify.append({
                "id": stakeholder,
                "match_reasons": {
                    "channel": channel_match,
                    "topic": topic_match,
                    "discussion_type": discussion_match,
                    "urgency": urgency_match
                }
            })
    
    return {
        "should_notify": len(stakeholders_to_notify) > 0,
        "stakeholders": stakeholders_to_notify,
        "analysis": analysis
    }

Slack Integration (slack_integration.py):

import os
import json
from slack_sdk import WebClient
from slack_sdk.socket_mode import SocketModeClient
from slack_sdk.socket_mode.request import SocketModeRequest
from slack_sdk.socket_mode.response import SocketModeResponse
from promptflow import load_flow

class SlackMonitorBot:
    def __init__(self):
        self.slack_client = WebClient(token=os.environ["SLACK_BOT_TOKEN"])
        self.socket_client = SocketModeClient(
            app_token=os.environ["SLACK_APP_TOKEN"],
            web_client=self.slack_client
        )
        self.flow = load_flow("./flows/slack-monitor")
        self.stakeholder_interests = self._load_stakeholder_config()
    
    def _load_stakeholder_config(self) -> dict:
        """Load stakeholder interest configuration."""
        with open("config/stakeholders.json") as f:
            return json.load(f)
    
    def handle_message(self, client, req: SocketModeRequest):
        """Handle incoming Slack messages."""
        if req.type == "events_api":
            event = req.payload.get("event", {})
            
            if event.get("type") == "message" and not event.get("bot_id"):
                # Acknowledge the request
                response = SocketModeResponse(envelope_id=req.envelope_id)
                client.send_socket_mode_response(response)
                
                # Get channel info
                channel_info = self.slack_client.conversations_info(
                    channel=event["channel"]
                )
                channel_name = channel_info["channel"]["name"]
                
                # Get sender info
                user_info = self.slack_client.users_info(user=event["user"])
                sender_name = user_info["user"]["real_name"]
                
                # Run the Promptflow flow
                result = self.flow(
                    message_text=event["text"],
                    channel_name=channel_name,
                    sender=sender_name,
                    stakeholder_interests=self.stakeholder_interests
                )
                
                # Send notifications if needed
                if result["should_notify"]:
                    self._send_notifications(
                        stakeholders=result["stakeholders"],
                        summary=result["summary"],
                        channel=channel_name,
                        original_channel_id=event["channel"],
                        message_ts=event["ts"]
                    )
    
    def _send_notifications(
        self,
        stakeholders: list,
        summary: str,
        channel: str,
        original_channel_id: str,
        message_ts: str
    ):
        """Send DM notifications to relevant stakeholders."""
        # Construct permalink
        permalink = self.slack_client.chat_getPermalink(
            channel=original_channel_id,
            message_ts=message_ts
        )["permalink"]
        
        for stakeholder in stakeholders:
            # Open DM channel with stakeholder
            dm = self.slack_client.conversations_open(
                users=[stakeholder["id"]]
            )
            
            # Send notification
            self.slack_client.chat_postMessage(
                channel=dm["channel"]["id"],
                blocks=[
                    {
                        "type": "header",
                        "text": {
                            "type": "plain_text",
                            "text": f"📢 Relevant Discussion in #{channel}"
                        }
                    },
                    {
                        "type": "section",
                        "text": {
                            "type": "mrkdwn",
                            "text": f"*Summary:*\n{summary}"
                        }
                    },
                    {
                        "type": "section",
                        "text": {
                            "type": "mrkdwn",
                            "text": f"*Why you're seeing this:*\n" + 
                                   ", ".join(k for k, v in stakeholder["match_reasons"].items() if v)
                        }
                    },
                    {
                        "type": "actions",
                        "elements": [
                            {
                                "type": "button",
                                "text": {
                                    "type": "plain_text",
                                    "text": "View Discussion"
                                },
                                "url": permalink
                            },
                            {
                                "type": "button",
                                "text": {
                                    "type": "plain_text",
                                    "text": "Mute This Topic"
                                },
                                "action_id": "mute_topic"
                            }
                        ]
                    }
                ]
            )
    
    def start(self):
        """Start the bot."""
        self.socket_client.socket_mode_request_listeners.append(
            self.handle_message
        )
        self.socket_client.connect()
        print("Bot is running...")

if __name__ == "__main__":
    bot = SlackMonitorBot()
    bot.start()

Stakeholder Configuration (config/stakeholders.json):

{
  "U12345678": {
    "name": "Sarah (Security Lead)",
    "keywords": ["security", "vulnerability", "CVE", "breach", "authentication", "authorization"],
    "channels": ["security-alerts", "engineering", "incidents"],
    "discussion_types": ["incident", "announcement", "question"],
    "urgency_threshold": "medium"
  },
  "U87654321": {
    "name": "Mike (Platform Lead)",
    "keywords": ["kubernetes", "docker", "AWS", "infrastructure", "deployment", "scaling"],
    "channels": ["platform", "engineering", "incidents", "devops"],
    "discussion_types": ["incident", "decision", "question"],
    "urgency_threshold": "medium"
  },
  "U11111111": {
    "name": "Lisa (Product Manager)",
    "keywords": ["roadmap", "feature", "customer", "feedback", "release"],
    "channels": ["product", "general", "engineering"],
    "discussion_types": ["decision", "announcement"],
    "urgency_threshold": "low"
  }
}

Use Case 2: Security Review Bot for Open Source Repositories

This bot performs automated security reviews on open source repositories using Python security scanning tools and provides analysis through Slack.

Architecture Overview

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│  Slack Command  │───▶│  Promptflow Bot  │───▶│  Results Report │
│  /security-scan │    │  + Python Tools  │    │  (Slack Thread) │
└─────────────────┘    └──────────────────┘    └─────────────────┘
         │                      │                       │
         ▼                      ▼                       ▼
   Repo URL Input        Security Scans          Stakeholder DMs
                         (Bandit, Safety,         (for high risk)
                          Semgrep, etc.)

Implementation

Flow Definition (flow.dag.yaml):

$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
inputs:
  repo_url:
    type: string
    description: "GitHub repository URL to scan"
  scan_types:
    type: list
    description: "Types of security scans to run"
    default: ["bandit", "safety", "semgrep"]
  notify_stakeholders:
    type: list
    description: "Stakeholder IDs to notify on high-risk findings"
outputs:
  summary:
    type: string
    reference: ${generate_report.output.summary}
  risk_level:
    type: string
    reference: ${generate_report.output.risk_level}
  findings:
    type: list
    reference: ${generate_report.output.findings}
  recommendations:
    type: string
    reference: ${generate_recommendations.output}
nodes:
  - name: clone_repository
    type: python
    source:
      type: code
      path: tools/clone_repo.py
    inputs:
      repo_url: ${inputs.repo_url}

  - name: run_bandit_scan
    type: python
    source:
      type: code
      path: tools/run_bandit.py
    inputs:
      repo_path: ${clone_repository.output.repo_path}
    activate:
      when: ${inputs.scan_types}
      contains: "bandit"

  - name: run_safety_scan
    type: python
    source:
      type: code
      path: tools/run_safety.py
    inputs:
      repo_path: ${clone_repository.output.repo_path}
    activate:
      when: ${inputs.scan_types}
      contains: "safety"

  - name: run_semgrep_scan
    type: python
    source:
      type: code
      path: tools/run_semgrep.py
    inputs:
      repo_path: ${clone_repository.output.repo_path}
    activate:
      when: ${inputs.scan_types}
      contains: "semgrep"

  - name: aggregate_results
    type: python
    source:
      type: code
      path: tools/aggregate_results.py
    inputs:
      bandit_results: ${run_bandit_scan.output}
      safety_results: ${run_safety_scan.output}
      semgrep_results: ${run_semgrep_scan.output}

  - name: generate_report
    type: llm
    source:
      type: code
      path: prompts/generate_report.jinja2
    inputs:
      deployment_name: gpt-4-turbo
      aggregated_results: ${aggregate_results.output}
      repo_url: ${inputs.repo_url}
    connection: azure_openai_connection
    api: chat

  - name: generate_recommendations
    type: llm
    source:
      type: code
      path: prompts/recommendations.jinja2
    inputs:
      deployment_name: gpt-4-turbo
      findings: ${generate_report.output.findings}
      risk_level: ${generate_report.output.risk_level}
    connection: azure_openai_connection
    api: chat

  - name: cleanup
    type: python
    source:
      type: code
      path: tools/cleanup.py
    inputs:
      repo_path: ${clone_repository.output.repo_path}

Clone Repository Tool (tools/clone_repo.py):

import os
import tempfile
import subprocess
from promptflow import tool

@tool
def clone_repository(repo_url: str) -> dict:
    """Clone a GitHub repository for security scanning."""
    # Validate URL format
    if not repo_url.startswith(("https://github.com/", "git@github.com:")):
        raise ValueError("Only GitHub repositories are supported")
    
    # Create temp directory
    temp_dir = tempfile.mkdtemp(prefix="security_scan_")
    
    try:
        # Clone the repository
        result = subprocess.run(
            ["git", "clone", "--depth", "1", repo_url, temp_dir],
            capture_output=True,
            text=True,
            timeout=120
        )
        
        if result.returncode != 0:
            raise RuntimeError(f"Failed to clone repository: {result.stderr}")
        
        # Extract repo name
        repo_name = repo_url.rstrip("/").split("/")[-1].replace(".git", "")
        
        return {
            "repo_path": temp_dir,
            "repo_name": repo_name,
            "success": True
        }
    except Exception as e:
        # Cleanup on failure
        if os.path.exists(temp_dir):
            subprocess.run(["rm", "-rf", temp_dir])
        raise e

Bandit Scanner (tools/run_bandit.py):

import subprocess
import json
from promptflow import tool

@tool
def run_bandit_scan(repo_path: str) -> dict:
    """
    Run Bandit security scanner on Python code.
    
    Bandit is a tool designed to find common security issues in Python code.
    """
    try:
        result = subprocess.run(
            [
                "bandit",
                "-r", repo_path,
                "-f", "json",
                "-ll",  # Only report medium and higher severity
                "--exclude", "*/test*,*/.git/*,*/venv/*"
            ],
            capture_output=True,
            text=True,
            timeout=300
        )
        
        # Bandit returns exit code 1 if issues are found
        if result.stdout:
            findings = json.loads(result.stdout)
        else:
            findings = {"results": [], "metrics": {}}
        
        # Transform to common format
        issues = []
        for finding in findings.get("results", []):
            issues.append({
                "tool": "bandit",
                "severity": finding.get("issue_severity", "UNKNOWN"),
                "confidence": finding.get("issue_confidence", "UNKNOWN"),
                "file": finding.get("filename", "").replace(repo_path, ""),
                "line": finding.get("line_number", 0),
                "issue_text": finding.get("issue_text", ""),
                "test_id": finding.get("test_id", ""),
                "cwe": finding.get("issue_cwe", {}).get("id", None),
                "code_snippet": finding.get("code", "")
            })
        
        return {
            "tool": "bandit",
            "success": True,
            "issues": issues,
            "summary": {
                "high": len([i for i in issues if i["severity"] == "HIGH"]),
                "medium": len([i for i in issues if i["severity"] == "MEDIUM"]),
                "low": len([i for i in issues if i["severity"] == "LOW"])
            }
        }
    except subprocess.TimeoutExpired:
        return {
            "tool": "bandit",
            "success": False,
            "error": "Scan timed out after 5 minutes",
            "issues": []
        }
    except Exception as e:
        return {
            "tool": "bandit",
            "success": False,
            "error": str(e),
            "issues": []
        }

Safety Scanner (tools/run_safety.py):

import subprocess
import json
import os
from promptflow import tool

@tool
def run_safety_scan(repo_path: str) -> dict:
    """
    Run Safety to check for known vulnerabilities in Python dependencies.
    """
    issues = []
    requirements_files = []
    
    # Find all requirements files
    for root, dirs, files in os.walk(repo_path):
        # Skip hidden and virtual environment directories
        dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ['venv', 'env', 'node_modules']]
        
        for file in files:
            if file in ['requirements.txt', 'requirements-dev.txt', 'requirements-test.txt']:
                requirements_files.append(os.path.join(root, file))
            elif file == 'Pipfile.lock' or file == 'poetry.lock':
                requirements_files.append(os.path.join(root, file))
    
    for req_file in requirements_files:
        try:
            result = subprocess.run(
                [
                    "safety", "check",
                    "-r", req_file,
                    "--json"
                ],
                capture_output=True,
                text=True,
                timeout=120
            )
            
            if result.stdout:
                try:
                    safety_output = json.loads(result.stdout)
                    vulnerabilities = safety_output if isinstance(safety_output, list) else safety_output.get("vulnerabilities", [])
                    
                    for vuln in vulnerabilities:
                        issues.append({
                            "tool": "safety",
                            "severity": "HIGH" if "critical" in str(vuln).lower() else "MEDIUM",
                            "package": vuln[0] if isinstance(vuln, list) else vuln.get("package_name", "unknown"),
                            "affected_version": vuln[2] if isinstance(vuln, list) else vuln.get("installed_version", "unknown"),
                            "vulnerability_id": vuln[4] if isinstance(vuln, list) else vuln.get("vulnerability_id", "unknown"),
                            "description": vuln[3] if isinstance(vuln, list) else vuln.get("advisory", ""),
                            "file": req_file.replace(repo_path, "")
                        })
                except json.JSONDecodeError:
                    pass
        except Exception as e:
            continue
    
    return {
        "tool": "safety",
        "success": True,
        "issues": issues,
        "files_scanned": len(requirements_files),
        "summary": {
            "high": len([i for i in issues if i["severity"] == "HIGH"]),
            "medium": len([i for i in issues if i["severity"] == "MEDIUM"]),
            "low": len([i for i in issues if i["severity"] == "LOW"])
        }
    }

Semgrep Scanner (tools/run_semgrep.py):

import subprocess
import json
from promptflow import tool

@tool
def run_semgrep_scan(repo_path: str) -> dict:
    """
    Run Semgrep with security-focused rulesets.
    """
    try:
        result = subprocess.run(
            [
                "semgrep", "scan",
                "--config", "p/security-audit",
                "--config", "p/secrets",
                "--json",
                "--timeout", "300",
                repo_path
            ],
            capture_output=True,
            text=True,
            timeout=360
        )
        
        issues = []
        
        if result.stdout:
            try:
                semgrep_output = json.loads(result.stdout)
                
                for finding in semgrep_output.get("results", []):
                    # Map Semgrep severity to our format
                    severity_map = {
                        "ERROR": "HIGH",
                        "WARNING": "MEDIUM",
                        "INFO": "LOW"
                    }
                    
                    issues.append({
                        "tool": "semgrep",
                        "severity": severity_map.get(finding.get("extra", {}).get("severity", "INFO"), "LOW"),
                        "rule_id": finding.get("check_id", ""),
                        "file": finding.get("path", "").replace(repo_path, ""),
                        "line_start": finding.get("start", {}).get("line", 0),
                        "line_end": finding.get("end", {}).get("line", 0),
                        "message": finding.get("extra", {}).get("message", ""),
                        "category": finding.get("extra", {}).get("metadata", {}).get("category", "security"),
                        "cwe": finding.get("extra", {}).get("metadata", {}).get("cwe", []),
                        "owasp": finding.get("extra", {}).get("metadata", {}).get("owasp", [])
                    })
            except json.JSONDecodeError:
                pass
        
        return {
            "tool": "semgrep",
            "success": True,
            "issues": issues,
            "summary": {
                "high": len([i for i in issues if i["severity"] == "HIGH"]),
                "medium": len([i for i in issues if i["severity"] == "MEDIUM"]),
                "low": len([i for i in issues if i["severity"] == "LOW"])
            }
        }
    except subprocess.TimeoutExpired:
        return {
            "tool": "semgrep",
            "success": False,
            "error": "Scan timed out",
            "issues": []
        }
    except Exception as e:
        return {
            "tool": "semgrep",
            "success": False,
            "error": str(e),
            "issues": []
        }

Report Generation Prompt (prompts/generate_report.jinja2):

system:
You are a security analyst expert. Analyze the security scan results and provide a comprehensive, actionable security report. Focus on severity, exploitability, and practical remediation steps.

user:
Repository: {{ repo_url }}

Security Scan Results:
{{ aggregated_results | tojson(indent=2) }}

Generate a security report with the following structure:
1. Executive Summary (2-3 sentences)
2. Overall Risk Level (CRITICAL, HIGH, MEDIUM, LOW)
3. Key Findings (top 5 most critical issues)
4. Detailed Findings by Category
5. Statistics

Format your response as JSON:
{
  "summary": "Executive summary text",
  "risk_level": "CRITICAL|HIGH|MEDIUM|LOW",
  "key_findings": [
    {
      "title": "Finding title",
      "severity": "HIGH|MEDIUM|LOW",
      "description": "Brief description",
      "file": "affected file",
      "remediation": "How to fix"
    }
  ],
  "findings_by_category": {
    "code_vulnerabilities": [...],
    "dependency_vulnerabilities": [...],
    "secrets_exposure": [...],
    "other": [...]
  },
  "statistics": {
    "total_issues": 0,
    "critical": 0,
    "high": 0,
    "medium": 0,
    "low": 0,
    "files_affected": 0
  }
}

Slack Integration for Security Bot (security_bot_slack.py):

import os
import json
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
from promptflow import load_flow

app = App(token=os.environ["SLACK_BOT_TOKEN"])
flow = load_flow("./flows/security-review")

# Stakeholders to notify for high-risk findings
SECURITY_STAKEHOLDERS = ["U12345678", "U87654321"]

@app.command("/security-scan")
def handle_security_scan(ack, say, command):
    """Handle the /security-scan slash command."""
    ack()
    
    repo_url = command["text"].strip()
    user_id = command["user_id"]
    channel_id = command["channel_id"]
    
    # Validate input
    if not repo_url:
        say("Please provide a repository URL. Usage: `/security-scan https://github.com/owner/repo`")
        return
    
    # Send initial response
    initial_message = say(
        blocks=[
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"🔍 *Starting security scan*\nRepository: `{repo_url}`\n\nThis may take a few minutes..."
                }
            }
        ]
    )
    
    try:
        # Run the security scan flow
        result = flow(
            repo_url=repo_url,
            scan_types=["bandit", "safety", "semgrep"],
            notify_stakeholders=SECURITY_STAKEHOLDERS
        )
        
        # Format and send results
        send_scan_results(
            channel_id=channel_id,
            thread_ts=initial_message["ts"],
            result=result,
            repo_url=repo_url
        )
        
        # Notify stakeholders if high risk
        if result["risk_level"] in ["CRITICAL", "HIGH"]:
            notify_security_stakeholders(
                result=result,
                repo_url=repo_url,
                requested_by=user_id,
                channel_id=channel_id
            )
    
    except Exception as e:
        app.client.chat_postMessage(
            channel=channel_id,
            thread_ts=initial_message["ts"],
            text=f"❌ Security scan failed: {str(e)}"
        )

def send_scan_results(channel_id: str, thread_ts: str, result: dict, repo_url: str):
    """Send formatted scan results to Slack."""
    
    # Risk level emoji mapping
    risk_emoji = {
        "CRITICAL": "🔴",
        "HIGH": "🟠",
        "MEDIUM": "🟡",
        "LOW": "🟢"
    }
    
    # Build blocks
    blocks = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": f"Security Scan Complete {risk_emoji.get(result['risk_level'], '⚪')}"
            }
        },
        {
            "type": "section",
            "fields": [
                {
                    "type": "mrkdwn",
                    "text": f"*Repository:*\n<{repo_url}|{repo_url.split('/')[-1]}>"
                },
                {
                    "type": "mrkdwn",
                    "text": f"*Risk Level:*\n{result['risk_level']}"
                }
            ]
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"*Summary:*\n{result['summary']}"
            }
        },
        {"type": "divider"}
    ]
    
    # Add statistics
    stats = result.get("findings", {}).get("statistics", {})
    blocks.append({
        "type": "section",
        "text": {
            "type": "mrkdwn",
            "text": f"*Statistics:*\n• Total Issues: {stats.get('total_issues', 0)}\n• Critical: {stats.get('critical', 0)}\n• High: {stats.get('high', 0)}\n• Medium: {stats.get('medium', 0)}\n• Low: {stats.get('low', 0)}"
        }
    })
    
    # Add key findings
    if result.get("findings"):
        blocks.append({"type": "divider"})
        blocks.append({
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": "*Key Findings:*"
            }
        })
        
        for finding in result["findings"][:5]:  # Top 5 findings
            severity_emoji = {"HIGH": "🔴", "MEDIUM": "🟡", "LOW": "🟢"}.get(finding.get("severity", ""), "⚪")
            blocks.append({
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"{severity_emoji} *{finding.get('title', 'Unknown')}*\n{finding.get('description', '')}\n_File: {finding.get('file', 'N/A')}_"
                }
            })
    
    # Add recommendations
    if result.get("recommendations"):
        blocks.append({"type": "divider"})
        blocks.append({
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"*Recommendations:*\n{result['recommendations']}"
            }
        })
    
    app.client.chat_postMessage(
        channel=channel_id,
        thread_ts=thread_ts,
        blocks=blocks
    )

def notify_security_stakeholders(result: dict, repo_url: str, requested_by: str, channel_id: str):
    """Send DM notifications to security stakeholders for high-risk findings."""
    
    for stakeholder_id in SECURITY_STAKEHOLDERS:
        dm = app.client.conversations_open(users=[stakeholder_id])
        
        app.client.chat_postMessage(
            channel=dm["channel"]["id"],
            blocks=[
                {
                    "type": "header",
                    "text": {
                        "type": "plain_text",
                        "text": f"🚨 High-Risk Security Scan Alert"
                    }
                },
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": f"A security scan of `{repo_url}` has returned *{result['risk_level']}* risk findings.\n\n*Summary:*\n{result['summary']}"
                    }
                },
                {
                    "type": "section",
                    "fields": [
                        {
                            "type": "mrkdwn",
                            "text": f"*Requested by:*\n<@{requested_by}>"
                        },
                        {
                            "type": "mrkdwn",
                            "text": f"*Channel:*\n<#{channel_id}>"
                        }
                    ]
                },
                {
                    "type": "actions",
                    "elements": [
                        {
                            "type": "button",
                            "text": {
                                "type": "plain_text",
                                "text": "View Full Report"
                            },
                            "action_id": "view_report",
                            "value": json.dumps({"channel": channel_id, "repo": repo_url})
                        }
                    ]
                }
            ]
        )

if __name__ == "__main__":
    handler = SocketModeHandler(app, os.environ["SLACK_APP_TOKEN"])
    handler.start()

Releasing a Live Version

Deployment Options

# azure-container-app.yaml
name: promptflow-slack-bot
resourceGroup: my-resource-group
location: eastus
properties:
  configuration:
    ingress:
      external: false
      targetPort: 8080
    secrets:
      - name: slack-bot-token
        value: ${SLACK_BOT_TOKEN}
      - name: openai-api-key
        value: ${OPENAI_API_KEY}
  template:
    containers:
      - name: slack-bot
        image: myregistry.azurecr.io/slack-bot:latest
        resources:
          cpu: 0.5
          memory: 1Gi
        env:
          - name: SLACK_BOT_TOKEN
            secretRef: slack-bot-token
          - name: OPENAI_API_KEY
            secretRef: openai-api-key
    scale:
      minReplicas: 1
      maxReplicas: 5

Deploy:

# Build and push container
docker build -t myregistry.azurecr.io/slack-bot:latest .
docker push myregistry.azurecr.io/slack-bot:latest

# Deploy to Azure Container Apps
az containerapp create \
  --name promptflow-slack-bot \
  --resource-group my-resource-group \
  --image myregistry.azurecr.io/slack-bot:latest \
  --environment my-container-env \
  --secrets slack-bot-token=$SLACK_BOT_TOKEN openai-api-key=$OPENAI_API_KEY \
  --env-vars SLACK_BOT_TOKEN=secretref:slack-bot-token OPENAI_API_KEY=secretref:openai-api-key

Option 2: Azure AI Studio Deployment

# Deploy flow to Azure AI
pf flow build --source ./flows/slack-monitor --output ./build --format docker

# Create managed endpoint
az ml online-endpoint create --file endpoint.yaml

# Deploy the flow
az ml online-deployment create --file deployment.yaml

Option 3: Self-Hosted with Docker Compose

# docker-compose.yaml
version: '3.8'
services:
  slack-monitor-bot:
    build:
      context: ./flows/slack-monitor
      dockerfile: Dockerfile
    environment:
      - SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN}
      - SLACK_APP_TOKEN=${SLACK_APP_TOKEN}
      - AZURE_OPENAI_API_KEY=${AZURE_OPENAI_API_KEY}
      - AZURE_OPENAI_ENDPOINT=${AZURE_OPENAI_ENDPOINT}
    restart: unless-stopped
    volumes:
      - ./config:/app/config:ro
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  security-review-bot:
    build:
      context: ./flows/security-review
      dockerfile: Dockerfile
    environment:
      - SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN}
      - SLACK_APP_TOKEN=${SLACK_APP_TOKEN}
      - AZURE_OPENAI_API_KEY=${AZURE_OPENAI_API_KEY}
      - AZURE_OPENAI_ENDPOINT=${AZURE_OPENAI_ENDPOINT}
    restart: unless-stopped
    volumes:
      - ./config:/app/config:ro
      - /tmp/security-scans:/tmp/security-scans

Production Dockerfile

# Dockerfile
FROM python:3.11-slim

# Install system dependencies for security tools
RUN apt-get update && apt-get install -y \
    git \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Install security scanning tools
RUN pip install bandit safety semgrep

# Copy application code
COPY . .

# Create non-root user
RUN useradd -m appuser && chown -R appuser:appuser /app
USER appuser

# Health check endpoint
EXPOSE 8080

CMD ["python", "main.py"]

Monitoring and Observability

# monitoring.py
import logging
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# Configure tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

otlp_exporter = OTLPSpanExporter(endpoint="http://otel-collector:4317")
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def log_flow_execution(flow_name: str, inputs: dict, outputs: dict, duration: float):
    """Log flow execution for monitoring."""
    with tracer.start_as_current_span(f"flow.{flow_name}") as span:
        span.set_attribute("flow.name", flow_name)
        span.set_attribute("flow.duration_ms", duration * 1000)
        span.set_attribute("flow.input_size", len(str(inputs)))
        span.set_attribute("flow.output_size", len(str(outputs)))
        
        logger.info(
            f"Flow executed: {flow_name}",
            extra={
                "flow_name": flow_name,
                "duration": duration,
                "success": True
            }
        )

Comparison with Alternative Frameworks

LangChain

LangChain is a popular framework for building LLM applications.

Strengths

FeatureLangChainPromptflow
CommunityLarger ecosystem, more tutorialsMicrosoft-backed, Azure integration
ChainsFlexible chain compositionDAG-based flow structure
AgentsSophisticated agent frameworkSimpler agent patterns
MemoryMultiple memory types built-inRequires custom implementation
Integrations100+ LLM/tool integrationsFocused on Azure ecosystem

When to Choose LangChain

  • Need extensive third-party integrations
  • Building complex agent systems with sophisticated reasoning
  • Prefer Python-first development
  • Require advanced memory/state management

LangChain Equivalent Implementation

from langchain.chat_models import ChatOpenAI
from langchain.agents import initialize_agent, AgentType
from langchain.tools import tool
from langchain.prompts import ChatPromptTemplate

# Define tools
@tool
def analyze_message(message: str) -> dict:
    """Analyze a Slack message for relevance."""
    llm = ChatOpenAI(model="gpt-4o-mini")
    prompt = ChatPromptTemplate.from_messages([
        ("system", "Analyze the message for topic, urgency, and relevance."),
        ("user", "{message}")
    ])
    chain = prompt | llm
    return chain.invoke({"message": message})

@tool
def notify_stakeholder(stakeholder_id: str, summary: str) -> str:
    """Send notification to a stakeholder."""
    # Implementation here
    return f"Notified {stakeholder_id}"

# Create agent
llm = ChatOpenAI(model="gpt-4o")
agent = initialize_agent(
    tools=[analyze_message, notify_stakeholder],
    llm=llm,
    agent=AgentType.OPENAI_FUNCTIONS,
    verbose=True
)

result = agent.run("Monitor this channel message and notify relevant parties")

Semantic Kernel

Semantic Kernel is Microsoft’s SDK for integrating LLMs into applications.

Strengths

FeatureSemantic KernelPromptflow
Language SupportC#, Python, JavaPython
Enterprise FocusStrong enterprise patternsAzure AI focus
Plugin SystemExtensible plugin architectureNode-based tools
MemorySemantic memory built-inExternal integration
OrchestrationPlanner-based orchestrationDAG-based flows

When to Choose Semantic Kernel

  • Working in a .NET/C# environment
  • Need enterprise-grade patterns
  • Building applications with semantic memory
  • Prefer SDK-style development

Semantic Kernel Equivalent Implementation

import semantic_kernel as sk
from semantic_kernel.connectors.ai.open_ai import OpenAIChatCompletion

# Initialize kernel
kernel = sk.Kernel()
kernel.add_chat_service("gpt4", OpenAIChatCompletion("gpt-4o", api_key))

# Define semantic function
analyze_message = kernel.create_semantic_function(
    """
    Analyze the following Slack message and classify its relevance:
    
    Message: {{$message}}
    
    Provide:
    1. Topic
    2. Urgency (high/medium/low)
    3. Relevant stakeholders
    """,
    function_name="analyze_message",
    plugin_name="SlackMonitor"
)

# Define native function
@sk.kernel_function(name="notify_stakeholder")
def notify_stakeholder(stakeholder_id: str, summary: str) -> str:
    """Send notification to stakeholder."""
    # Implementation
    return f"Notified {stakeholder_id}"

kernel.register_native_function(notify_stakeholder)

# Create plan and execute
from semantic_kernel.planning import SequentialPlanner

planner = SequentialPlanner(kernel)
plan = await planner.create_plan_async("Analyze the message and notify stakeholders")
result = await plan.invoke_async()

Comparison Summary

AspectPromptflowLangChainSemantic Kernel
Learning CurveModerateSteepModerate
Visual DevelopmentYes (flow diagrams)NoNo
Testing/EvaluationBuilt-inSeparate toolsBasic
Azure IntegrationNativeThrough LangChainNative
Community SizeGrowingLargeMedium
Enterprise ReadyYesYesYes
Best ForAzure-centric orgsComplex agents.NET shops

Choosing the Right Framework

Choose Promptflow when:

  • Your organization uses Azure AI services
  • You prefer visual flow development
  • You need built-in evaluation and testing
  • You want straightforward deployment to Azure

Choose LangChain when:

  • You need maximum flexibility and integrations
  • Building complex multi-agent systems
  • You have diverse LLM provider requirements
  • Community support and examples are important

Choose Semantic Kernel when:

  • Working primarily in C# or Java
  • Need semantic memory capabilities
  • Following Microsoft’s enterprise patterns
  • Building applications alongside existing .NET infrastructure

Conclusion

Microsoft Promptflow provides a comprehensive framework for developing, testing, and deploying AI agents. Its strengths lie in its visual development experience, built-in evaluation capabilities, and seamless integration with Azure AI services.

Through our two use cases—a Slack channel monitor and a security review bot—we’ve demonstrated how Promptflow can be used to build practical, production-ready AI agents that provide real value to organizations. The framework’s DAG-based approach to flow design makes complex workflows manageable and testable.

When compared to alternatives like LangChain and Semantic Kernel, Promptflow offers a middle ground: more structure than LangChain’s flexibility, with more accessibility than Semantic Kernel’s enterprise focus. The choice ultimately depends on your organization’s technology stack, team expertise, and specific requirements.

Key Takeaways

  1. Start with clear flows: Promptflow’s DAG structure encourages well-defined, testable workflows
  2. Leverage built-in evaluation: Use the evaluation framework from day one to ensure quality
  3. Consider costs early: Model selection and caching strategies significantly impact operational costs
  4. Test iteratively: The local testing capabilities allow rapid iteration before deployment
  5. Plan for production: Promptflow’s Azure integration makes production deployment straightforward

Resources