Visualize PgFlow workflow execution in real-time.
Powered by PostgreSQL ( PGMQ , pg_cron , PgFlow ), Elixir and Phoenix LiveView
Click on Workflow nodes, Flow DSL steps, Cron DSL , or Event Log entries to highlight corresponding elements and view Step Output.
No events yet
Start a flow to see events
defmodule PgflowDemo.Flows.ArticleFlow do
@moduledoc """
Demo flow that processes an article URL through multiple steps.
DAG Structure:
```
fetch_article → convert_to_markdown → summarize → publish
↘ extract_keywords ↗
```
Compiles to: `pgmq.article_flow` queue and `pgflow.flows`/`pgflow.steps` rows in Postgres.
"""
use PgFlow.Flow
@flow queue: :article_flow, max_attempts: 3, base_delay: 5, timeout: 120
# Fetch the article HTML from the URL
step :fetch_article do
fn input, _ctx ->
url = input["url"]
case Req.get(url, receive_timeout: 30_000) do
{:ok, %{status: status, body: body}} when status in 200..299 ->
%{
"url" => url,
"html" => body,
"fetched_at" => DateTime.utc_now() |> DateTime.to_iso8601()
}
{:ok, %{status: status}} ->
raise "Failed to fetch article: HTTP #{status}"
{:error, reason} ->
raise "Failed to fetch article: #{inspect(reason)}"
end
end
# Convert HTML to Markdown
step :convert_to_markdown, depends_on: [:fetch_article] do
fn deps, _ctx ->
html = deps["fetch_article"]["html"]
# Parse HTML and extract main content
markdown = html_to_markdown(html)
%{
"markdown" => markdown,
"char_count" => String.length(markdown)
}
end
# Summarize the article using LLM (runs in parallel with extract_keywords)
step :summarize, depends_on: [:convert_to_markdown] do
fn deps, _ctx ->
markdown = deps["convert_to_markdown"]["markdown"]
case PgflowDemo.LLM.summarize(markdown) do
{:ok, summary} ->
%{"summary" => summary}
{:error, reason} ->
raise "LLM summarization failed: #{reason}"
end
end
# Extract keywords using LLM (runs in parallel with summarize)
step :extract_keywords, depends_on: [:convert_to_markdown] do
fn deps, _ctx ->
markdown = deps["convert_to_markdown"]["markdown"]
case PgflowDemo.LLM.extract_keywords(markdown) do
{:ok, keywords} ->
%{"keywords" => keywords}
{:error, reason} ->
raise "LLM keyword extraction failed: #{reason}"
end
end
# Combine all results into final output
step :publish, depends_on: [:summarize, :extract_keywords] do
fn deps, _ctx ->
%{
"summary" => deps["summarize"]["summary"],
"keywords" => deps["extract_keywords"]["keywords"],
"published_at" => DateTime.utc_now() |> DateTime.to_iso8601()
}
This Flow DSL is compiled to an that creates the flow definition in PostgreSQL.
No output yet
Run a flow or click a completed step to view its output
Hourly
@hourly
Next run
in 2 minutes
Retention
24 hours
defmodule PgflowDemo.Jobs.ArticleFlowCleanup do
@moduledoc """
Hourly cleanup job that prunes old article_flow run records.
Compiles to: `cron.schedule(...)` in Postgres via pg_cron extension.
"""
use PgFlow.Job
alias PgFlow.Queries.Flows, as: FlowQueries
alias PgflowDemo.Repo
# Require to ensure module is compiled before macro expansion
require FlowQueries
@job queue: :article_flow_cleanup,
max_attempts: 3,
timeout: 60,
cron: [
schedule: "@hourly",
input: %{"retention_hours" => 24}
]
perform do
fn input, _ctx ->
retention_hours = input["retention_hours"] || 24
{:ok, result} =
FlowQueries.prune_data(
Repo,
retention_hours,
flow_slugs: ["article_flow"]
)
Map.merge(result, %{
retention_hours: retention_hours,
flow_slugs: ["article_flow"],
completed_at: DateTime.utc_now() |> DateTime.to_iso8601()
})
end
end
end
Scheduled job that prunes old article_flow runs hourly. View in Dashboard