risingwave-mcp
RisingWave MCP Server
RisingWave MCP Server
RisingWave MCP Server is a comprehensive Model Context Protocol (MCP) server that lets you query and manage your RisingWave streaming database using natural language through AI assistants like VS Code Copilot and Claude Desktop.
Features
- Real-time access to RisingWave tables, materialized views, and streaming data
- Complete management of sources, sinks, connections, indexes, and UDFs
- Streaming job monitoring with fragment, actor, and backfill tracking
- Storage analysis with Hummock/compaction insights
- Built on
FastMCPandrisingwave-pywith high-performance STDIO transport - Seamless integration with VS Code Copilot, Claude Desktop, and other MCP-compatible tools
Installation
git clone https://github.com/risingwavelabs/risingwave-mcp.git
cd risingwave-mcp
pip install -r requirements.txt
Setting Up
You'll need a running RisingWave instance—either locally or in the cloud.
Option 1: Run RisingWave Locally
# Install RisingWave standalone
curl -L https://risingwave.com/sh | sh
# macOS
risingwave
# Linux
./risingwave
For Docker or other options, see the docs:
https://docs.risingwave.com/get-started/quickstart
Option 2: Use RisingWave Cloud
You can also spin up a free-tier cluster in seconds:
https://cloud.risingwave.com/auth/signin
Integration
VS Code Copilot
- In the VS Code Chat panel: Agent Mode → Select Tools → Create MCP Server.
- Add the following to
.vscode/mcp.json.
Option 1: Use a connection string
{
"servers": {
"risingwave-mcp": {
"type": "stdio",
"command": "python",
"args": ["path_to/risingwave-mcp/src/main.py"],
"env": {
"RISINGWAVE_CONNECTION_STR": "postgresql://root:root@localhost:4566/dev"
}
}
}
}
Explanation:
postgresql://— Use PostgreSQL protocol (RisingWave is compatible)root:root@— Username and passwordlocalhost:4566— Host and port/dev— Database name
Option 2: Use individual parameters
{
"servers": {
"risingwave-mcp": {
"type": "stdio",
"command": "python",
"args": ["path_to/risingwave-mcp/src/main.py"],
"env": {
"RISINGWAVE_HOST": "localhost",
"RISINGWAVE_PORT": "4566",
"RISINGWAVE_USER": "root",
"RISINGWAVE_PASSWORD": "root",
"RISINGWAVE_DATABASE": "dev",
"RISINGWAVE_SSLMODE": "disable"
}
}
}
}
Start chatting!
Ask questions like:
- "List my tables"
- "Create a materialized view that aggregates payments by minute"
- "Show me the backfill progress for all streaming jobs"
- "What's the cluster status?"
Claude Desktop
- Add the MCP server to your
claude_desktop_config.jsonundermcpServers:
{
"mcpServers": {
"risingwave-mcp": {
"command": "python",
"args": ["path_to/risingwave-mcp/src/main.py"],
"env": {
"RISINGWAVE_CONNECTION_STR": "postgresql://root:root@localhost:4566/dev"
}
}
}
}
- Restart Claude Desktop to apply changes.
Manual Testing (Dev / CI)
You can run the MCP server directly from the CLI:
python src/main.py
This will listen for MCP messages over STDIN/STDOUT.
Available Tools
The MCP server provides 100+ tools organized into the following categories:
Query Tools
| Tool Name | Description |
|---|---|
run_select_query |
Execute a SELECT query against RisingWave |
table_row_count |
Get the row count for a specific table |
get_table_stats |
Get comprehensive statistics for a table |
Explain Tools
| Tool Name | Description |
|---|---|
explain_query |
Get query execution plan without running it |
explain_analyze |
Execute EXPLAIN ANALYZE for detailed runtime statistics |
explain_distsql |
Get distributed execution plan for streaming statements |
Schema Tools
| Tool Name | Description |
|---|---|
show_tables |
List all tables in the database |
list_databases |
List all databases in the cluster |
describe_table |
Describe table structure (columns, types) |
describe_materialized_view |
Describe materialized view structure |
show_create_table |
Show CREATE TABLE statement |
show_create_materialized_view |
Show CREATE MATERIALIZED VIEW statement |
check_table_exists |
Check if a table or MV exists |
list_schemas |
List all schemas in the database |
list_materialized_views |
List all materialized views in a schema |
get_table_columns |
Get detailed column information |
list_subscriptions |
List all subscriptions in a schema |
list_table_privileges |
List privileges for a table |
list_sinks |
List all sinks in the database |
show_create_sink |
Show CREATE SINK statement |
get_relation_info |
Get info about any relation by name |
show_create_source |
Show CREATE SOURCE statement |
DDL Tools (Data Definition)
| Tool Name | Description |
|---|---|
create_schema |
Create a new schema |
drop_table |
Drop a table |
add_table_column |
Add a column to a table |
drop_table_column |
Remove a column from a table |
rename_table |
Rename a table |
alter_table_parallelism |
Change table parallelism |
alter_table_source_rate_limit |
Modify source rate limit for tables |
create_materialized_view |
Create a new materialized view |
drop_materialized_view |
Drop a materialized view |
alter_mv_parallelism |
Change MV parallelism |
alter_mv_backfill_rate_limit |
Modify MV backfill rate limit |
rename_materialized_view |
Rename a materialized view |
swap_materialized_views |
Atomically swap two MVs |
execute_ddl_statement |
Execute generic DDL statements |
create_kafka_table |
Create a table with Kafka connector |
add_table_comment |
Add comment to a table |
add_column_comment |
Add comment to a column |
DML Tools (Data Manipulation)
| Tool Name | Description |
|---|---|
insert_single_row |
Insert a single row into a table |
insert_multiple_rows |
Batch insert multiple rows |
update_rows |
Update rows in a table |
delete_rows |
Delete rows from a table |
Source Tools
| Tool Name | Description |
|---|---|
list_sources |
List all sources |
describe_source |
Describe source structure |
alter_source_parallelism |
Change source parallelism |
alter_source_rate_limit |
Modify source ingestion rate |
refresh_source_schema |
Refresh schema from registry |
rename_source |
Rename a source |
drop_source |
Drop a source |
Sink Tools
| Tool Name | Description |
|---|---|
describe_sink |
Describe sink structure |
alter_sink_parallelism |
Change sink parallelism |
alter_sink_rate_limit |
Modify sink output rate |
rename_sink |
Rename a sink |
drop_sink |
Drop a sink |
Connection Tools
| Tool Name | Description |
|---|---|
list_connections |
List all connections |
show_create_connection |
Show CREATE CONNECTION statement |
drop_connection |
Drop a connection |
Secret Tools
| Tool Name | Description |
|---|---|
list_secrets |
List all secrets (values hidden) |
drop_secret |
Drop a secret |
Index Tools
| Tool Name | Description |
|---|---|
list_indexes |
List indexes for a table |
describe_index |
Describe index structure |
show_create_index |
Show CREATE INDEX statement |
drop_index |
Drop an index |
Function Tools (UDFs)
| Tool Name | Description |
|---|---|
list_functions |
List all user-defined functions |
show_create_function |
Show CREATE FUNCTION statement |
drop_function |
Drop a function |
User & Access Control Tools
| Tool Name | Description |
|---|---|
list_users |
List all users |
get_user_privileges |
Get privileges for a user |
get_schema_privileges |
Get schema-level privileges |
get_database_privileges |
Get database-level privileges |
Cluster Tools
| Tool Name | Description |
|---|---|
show_cluster |
Display cluster node details |
show_jobs |
List active streaming jobs |
cancel_jobs |
Cancel running jobs |
recover_cluster |
Trigger manual recovery |
get_cluster_info |
Get comprehensive cluster info |
Database Management Tools
| Tool Name | Description |
|---|---|
get_database_version |
Get RisingWave version info |
show_running_queries |
Show currently running queries |
flush_database |
Force flush pending writes |
Session Tools
| Tool Name | Description |
|---|---|
set_session_variable |
Set a session variable |
show_session_variable |
Show a session variable value |
show_all_session_variables |
Show all session variables |
alter_system_variable |
Alter system-wide defaults |
Iceberg Tools
| Tool Name | Description |
|---|---|
vacuum_table |
Expire old snapshots on Iceberg table |
vacuum_full_table |
Full compaction + vacuum |
Streaming Infrastructure Tools
| Tool Name | Description |
|---|---|
get_worker_nodes |
Get all worker nodes with status |
get_actor_distribution |
Show actor count per worker |
get_fragment_stats |
Get fragment statistics |
get_fragment_communication_cost |
Calculate shuffle overhead |
list_fragments |
List all fragments |
get_fragment_details |
Get detailed fragment info |
list_actors |
List all actors |
get_actors_for_fragment |
Get actors for a fragment |
get_object_id_by_name |
Get internal ID by name |
get_mv_by_fragment_id |
Find MV by fragment ID |
get_mv_by_actor_id |
Trace actor to MV |
get_worker_for_actor |
Find worker for actor |
get_workers_for_fragment |
Find workers for fragment |
get_streaming_job_fragments |
Get fragments for a job |
get_streaming_job_actors |
Get actors for a job |
get_backfill_progress |
Get backfill progress |
get_fragment_backfill_progress |
Get fragment-level backfill |
get_upstream_fragments |
Get upstream fragments |
get_downstream_fragments |
Get downstream fragments |
get_fragments_by_state_table |
Find fragments by state table |
get_streaming_parallelism |
Get parallelism settings |
get_mv_worker_distribution |
Get MV actor distribution |
get_job_by_state_table_id |
Find job by state table |
get_internal_tables |
Get internal state tables |
check_vnode_distribution |
Check for data skew |
get_downstream_dependents |
Find dependent MVs |
get_upstream_dependents |
Find upstream dependencies |
get_sink_decouple_status |
Check sink decouple status |
get_sink_logstore_lag |
Monitor decoupled sink lag |
Storage Tools (Hummock)
| Tool Name | Description |
|---|---|
get_table_storage_stats |
Get storage stats for a table |
get_all_table_storage_stats |
Get storage stats for all tables |
get_sst_level_layout |
Check SST layout per level |
get_compaction_group_configs |
Get compaction configs |
get_sst_delete_ratio |
Check SST delete ratios |
get_sst_vnode_distribution |
Check vnode distribution in SSTs |
get_large_state_tables_in_compaction_group |
Find large state tables |
get_l0_stats |
Get L0 level statistics |
get_jobs_in_compaction_group |
Find jobs in compaction group |
get_meta_snapshot_info |
Get meta snapshot info |
get_compaction_group_summary |
Get compaction group summary |
System Catalog Tools
| Tool Name | Description |
|---|---|
get_table_constraints |
Get table constraints |
get_views_info |
Get views information |
get_all_objects_in_schema |
List all objects in schema |
get_object_comments |
Get object comments |
get_streaming_job_status |
Get streaming job status |
get_system_tables_info |
List system tables |
query_system_catalog |
Query any system catalog |
Architecture
src/
├── main.py # Entry point, registers all tools
├── connection.py # Database connection setup
├── query_tools.py # SELECT queries and table stats
├── explain.py # EXPLAIN and EXPLAIN ANALYZE
├── schema_tools.py # Schema inspection tools
├── ddl_tools.py # CREATE, ALTER, DROP operations
├── dml_tools.py # INSERT, UPDATE, DELETE operations
├── source_tools.py # Source management
├── sink_tools.py # Sink management
├── connection_tools.py # Connection management
├── secret_tools.py # Secret management
├── index_tools.py # Index management
├── function_tools.py # UDF management
├── user_tools.py # User and access control
├── cluster_tools.py # Cluster management
├── management_tools.py # Database management
├── session_tools.py # Session variables
├── iceberg_tools.py # Iceberg/vacuum operations
├── streaming_tools.py # Fragments, actors, backfill
├── storage_tools.py # Hummock/compaction analysis
└── catalog_tools.py # System catalog queries
Example Conversations
Query Data
User: Show me all tables in the public schema
Assistant: [calls show_tables]
User: What's in the orders table?
Assistant: [calls describe_table with table_name="orders"]
User: Get me the last 10 orders
Assistant: [calls run_select_query with query="SELECT * FROM orders ORDER BY created_at DESC LIMIT 10"]
Monitor Streaming Jobs
User: What's the backfill progress?
Assistant: [calls get_backfill_progress]
User: Show me the cluster status
Assistant: [calls show_cluster]
User: Are there any running jobs?
Assistant: [calls show_jobs]
Manage Objects
User: Create a materialized view that counts orders per customer
Assistant: [calls create_materialized_view with appropriate SQL]
User: Change the parallelism of mv_orders to 4
Assistant: [calls alter_mv_parallelism with mv_name="mv_orders", parallelism="4"]
User: Drop the old_source source
Assistant: [calls drop_source with source_name="old_source"]
Debug Performance
User: Explain this query: SELECT * FROM orders WHERE status = 'pending'
Assistant: [calls explain_query]
User: What's the storage usage for each table?
Assistant: [calls get_all_table_storage_stats]
User: Check for data skew in the orders table
Assistant: [calls check_vnode_distribution with table_name="orders", distribution_key="customer_id"]
Testing
The MCP server includes comprehensive testing tools:
1. Unit Tests (with mocked connections)
# Install test dependencies
pip install pytest
# Run all unit tests
pytest tests/test_tools.py -v
# Run specific test class
pytest tests/test_tools.py::TestSchemaTools -v
2. Integration Tests (requires running RisingWave)
# Set connection string
export RISINGWAVE_CONNECTION_STR="postgresql://root:root@localhost:4566/dev"
# Run all integration tests
python tests/integration_test.py
# Run specific category
python tests/integration_test.py --category schema
python tests/integration_test.py --category cluster
python tests/integration_test.py --category streaming
# Verbose output
python tests/integration_test.py -v
Available test categories: schema, source, sink, cluster, management, session, streaming, storage, catalog, connection, secret, function, user, explain, query
3. Interactive MCP Inspector
# Start interactive mode
python tests/mcp_inspector.py
# List all tools
python tests/mcp_inspector.py --list
# List tools by category
python tests/mcp_inspector.py --list --category schema
# Run a specific tool
python tests/mcp_inspector.py --tool show_tables
# Run a tool with arguments
python tests/mcp_inspector.py --tool describe_table --args '{"table_name": "orders"}'
Interactive mode commands:
mcp> list # List all tools
mcp> list schema # List schema tools
mcp> run show_tables # Run a tool
mcp> run describe_table {"table_name": "orders"} # Run with args
mcp> help show_cluster # Show tool details
mcp> quit # Exit
4. Manual Testing with MCP CLI
If you have the MCP CLI installed:
# Test the server directly
echo '{"jsonrpc": "2.0", "method": "tools/list", "id": 1}' | python src/main.py
Contributing
Contributions are welcome! Please feel free to submit issues or pull requests.
License
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
Reviews (0)
Sign in to leave a review.
Leave a reviewNo results found