PgFlow Demo

Visualize PgFlow workflow execution in real-time.

Powered by PostgreSQL ( PGMQ , pg_cron , PgFlow ), Elixir and Phoenix LiveView

Click on Workflow nodes, Flow DSL steps, Cron DSL , or Event Log entries to highlight corresponding elements and view Step Output.

Ready

Workflow

Fetch Markdown Summarize Keywords Publish

Event Log

No events yet

Start a flow to see events

Flow DSL

defmodule PgflowDemo.Flows.ArticleFlow do
  @moduledoc """
  Demo flow that processes an article URL through multiple steps.

  DAG Structure:
  ```
  fetch_article → convert_to_markdown → summarize        → publish
                                      ↘ extract_keywords ↗
  ```

  Compiles to: `pgmq.article_flow` queue and `pgflow.flows`/`pgflow.steps` rows in Postgres.
  """

  use PgFlow.Flow
  @flow queue: :article_flow, max_attempts: 3, base_delay: 5, timeout: 120

  # Fetch the article HTML from the URL
  step :fetch_article do
    fn input, _ctx ->
      url = input["url"]

      case Req.get(url, receive_timeout: 30_000) do
        {:ok, %{status: status, body: body}} when status in 200..299 ->
          %{
            "url" => url,
            "html" => body,
            "fetched_at" => DateTime.utc_now() |> DateTime.to_iso8601()
          }

        {:ok, %{status: status}} ->
          raise "Failed to fetch article: HTTP #{status}"

        {:error, reason} ->
          raise "Failed to fetch article: #{inspect(reason)}"
      end
  end

  # Convert HTML to Markdown
  step :convert_to_markdown, depends_on: [:fetch_article] do
    fn deps, _ctx ->
      html = deps["fetch_article"]["html"]

      # Parse HTML and extract main content
      markdown = html_to_markdown(html)

      %{
        "markdown" => markdown,
        "char_count" => String.length(markdown)
      }
  end

  # Summarize the article using LLM (runs in parallel with extract_keywords)
  step :summarize, depends_on: [:convert_to_markdown] do
    fn deps, _ctx ->
      markdown = deps["convert_to_markdown"]["markdown"]

      case PgflowDemo.LLM.summarize(markdown) do
        {:ok, summary} ->
          %{"summary" => summary}

        {:error, reason} ->
          raise "LLM summarization failed: #{reason}"
      end
  end

  # Extract keywords using LLM (runs in parallel with summarize)
  step :extract_keywords, depends_on: [:convert_to_markdown] do
    fn deps, _ctx ->
      markdown = deps["convert_to_markdown"]["markdown"]

      case PgflowDemo.LLM.extract_keywords(markdown) do
        {:ok, keywords} ->
          %{"keywords" => keywords}

        {:error, reason} ->
          raise "LLM keyword extraction failed: #{reason}"
      end
  end

  # Combine all results into final output
  step :publish, depends_on: [:summarize, :extract_keywords] do
    fn deps, _ctx ->
      %{
        "summary" => deps["summarize"]["summary"],
        "keywords" => deps["extract_keywords"]["keywords"],
        "published_at" => DateTime.utc_now() |> DateTime.to_iso8601()
      }

This Flow DSL is compiled to an that creates the flow definition in PostgreSQL.

Step Output

No output yet

Run a flow or click a completed step to view its output

Cron DSL — Scheduled cleanup job

Hourly

@hourly

Next run

in 2 minutes

Retention

24 hours

Active
defmodule PgflowDemo.Jobs.ArticleFlowCleanup do
  @moduledoc """
  Hourly cleanup job that prunes old article_flow run records.

  Compiles to: `cron.schedule(...)` in Postgres via pg_cron extension.
  """
  use PgFlow.Job

  alias PgFlow.Queries.Flows, as: FlowQueries
  alias PgflowDemo.Repo

  # Require to ensure module is compiled before macro expansion
  require FlowQueries

  @job queue: :article_flow_cleanup,
       max_attempts: 3,
       timeout: 60,
       cron: [
         schedule: "@hourly",
         input: %{"retention_hours" => 24}
       ]

  perform do
    fn input, _ctx ->
      retention_hours = input["retention_hours"] || 24

      {:ok, result} =
        FlowQueries.prune_data(
          Repo,
          retention_hours,
          flow_slugs: ["article_flow"]
        )

      Map.merge(result, %{
        retention_hours: retention_hours,
        flow_slugs: ["article_flow"],
        completed_at: DateTime.utc_now() |> DateTime.to_iso8601()
      })
    end
  end
end

Scheduled job that prunes old article_flow runs hourly. View in Dashboard