I wonder if anyone has experience or thoughts on how to go about monitoring Postgres replication lag?
Hey @mjtice ,
The best way, I think, is to help us make the collector even better:
https://github.com/netdata/netdata/blob/master/collectors/python.d.plugin/postgres/postgres.chart.py
We could add another query that queries postgresql for that information and then make the proper modifications. to create the charts.
We have a whole guide on how to create a python collector, but this should be much easier, since we will simply extend the existing collector.
What do you think?We will help you along the way!
Hey @OdysLam , I’ve written a few postgres plugins for use internally (and I was actually just starting on this plugin this morning).
I think I’m going to take the approach the authors of the Nagios check_postgres
plugin did for their replicate_row
in that it ensures that replication is actually functioning (vs. looking at some internal counters, assuming replication is running).
At netdata we are in the process of moving all our collectors to Golang. If you are up to it, we can start working on a Golang PostgreSQL collector.
The reason we want Golang is that it’s a better user experience for the user, since they don’t have to install a middleware (python), just use the binary.
We have a sort of guide for writing collectors in Golang and @ilyam8 would be able to help you
To get started, take a look at the contributing handbook (it’s very detailed) and the Golang guide:
Interesting. @OdysLam is the plan to deprecate and remove other non-golang collectors or is it just that official collectors will be golang?
Although not final, we will probably phase out most non-golang collectors. That means that we will still offer them through community repos, so that people can contribute easily, but we will not be responsible for maintaining them or providing support.
They will be offered “as-is”.
That being said, postgresql is a very core collector, so it definitely will be implemented in Golang. Thus, if you start working on this, you can be certain that your contribution will become core part of the OSS Netdata Agent.
We accept improvements of the python Postgres collector (since we have no it rewritten in go).
@mjtice check Adding more postgres metrics PR, it adds the following charts:
- Standby delta
- Standby lag
- Average number of blocking transactions in db
I think that is related to your request.
And there are more replication metrics coming.
Thanks for the info, @ilyam8. I’m glad you’re keeping python-based collectors around. I can write those with some ease (after a lot of practice (albeit they’re not super pretty)).
Looking at the PR, unfortunately the query doesn’t fit what we have in place now (e.g.)
SELECT
application_name,
COALESCE(EXTRACT(EPOCH FROM write_lag)::bigint, 0) AS write_lag,
COALESCE(EXTRACT(EPOCH FROM flush_lag)::bigint, 0) AS flush_lag,
COALESCE(EXTRACT(EPOCH FROM replay_lag)::bigint, 0) AS replay_lag
FROM pg_stat_replication
WHERE application_name IS NOT NULL;
application_name | write_lag | flush_lag | replay_lag
------------------------+-----------+-----------+------------
walreceiver | 0 | 0 | 0
walreceiver | 0 | 0 | 0
walreceiver | 0 | 0 | 0
PostgreSQL JDBC Driver | 7 | 19 | 19
PostgreSQL JDBC Driver | 2 | 12 | 12
PostgreSQL JDBC Driver | 1 | 1 | 1
I’ve written another plugin to check our replication_slots (borrowed heavily from the original postgres plugin).
# -*- coding: utf-8 -*-
# Description: example netdata python.d module
# Authors: facetoe, dangtranhoang
# SPDX-License-Identifier: GPL-3.0-or-later
try:
import psycopg2
from psycopg2 import extensions
from psycopg2.extras import DictCursor
from psycopg2 import OperationalError
PSYCOPG2 = True
except ImportError:
PSYCOPG2 = False
from bases.FrameworkServices.SimpleService import SimpleService
# Global variables used for connection construction.
DEFAULT_PORT = 5432
DEFAULT_USER = 'postgres'
DEFAULT_CONNECT_TIMEOUT = 2 # seconds
DEFAULT_STATEMENT_TIMEOUT = 5000 # ms
CONN_PARAM_DSN = 'dsn'
CONN_PARAM_HOST = 'host'
CONN_PARAM_PORT = 'port'
CONN_PARAM_DATABASE = 'database'
CONN_PARAM_USER = 'user'
CONN_PARAM_PASSWORD = 'password'
CONN_PARAM_CONN_TIMEOUT = 'connect_timeout'
CONN_PARAM_STATEMENT_TIMEOUT = 'statement_timeout'
CONN_PARAM_SSL_MODE = 'sslmode'
CONN_PARAM_SSL_ROOT_CERT = 'sslrootcert'
CONN_PARAM_SSL_CRL = 'sslcrl'
CONN_PARAM_SSL_CERT = 'sslcert'
CONN_PARAM_SSL_KEY = 'sslkey'
# End global variables
# Global variables required by python.d plugin.
ORDER = [
'replication_slot_time',
'replication_slot_bytes'
]
CHARTS = {
'replication_slot_time': {
'options': [None, 'seconds behind', 'seconds', 'replication_slot lag time', 'postgres.replication_slot_lag_time', 'line'],
'lines': [
]
},
'replication_slot_bytes': {
'options': [None, 'bytes behind', 'bytes', 'replication_slot lag bytes', 'postgres.replication_slot_lag_bytes', 'line'],
'lines': [
]
}
}
# End global variables
# Global variable declaring our queries
QUERIES = {
0: """
select
CASE
WHEN pg_replication_slots.slot_type = 'physical' THEN pg_replication_slots.slot_name
WHEN pg_replication_slots.slot_type = 'logical' THEN pg_replication_slots.database
END as db_name,
(EXTRACT(hour FROM pg_stat_replication.replay_lag) * 60 * 60 +
+EXTRACT(minutes FROM pg_stat_replication.replay_lag) * 60
+ EXTRACT(seconds FROM pg_stat_replication.replay_lag)) :: int as seconds_behind,
redo_lsn - restart_lsn AS bytes_behind
FROM pg_stat_replication, pg_replication_slots, pg_control_checkpoint()
where pg_stat_replication.pid = pg_replication_slots.active_pid
"""
}
# End global variables
class Service(SimpleService):
def __init__(self, configuration=None, name=None):
SimpleService.__init__(self, configuration=configuration, name=name)
self.order = ORDER
self.definitions = CHARTS
self.queries = QUERIES
self.configuration = configuration
self.conn = None
self.conn_params = dict()
self.alive = False
self.replication_slots = list()
self.data = dict()
def reconnect(self):
return self.connect()
def build_conn_params(self):
conf = self.configuration
# connection URIs: https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
if conf.get(CONN_PARAM_DSN):
return {'dsn': conf[CONN_PARAM_DSN]}
params = {
CONN_PARAM_HOST: conf.get(CONN_PARAM_HOST),
CONN_PARAM_PORT: conf.get(CONN_PARAM_PORT, DEFAULT_PORT),
CONN_PARAM_DATABASE: conf.get(CONN_PARAM_DATABASE),
CONN_PARAM_USER: conf.get(CONN_PARAM_USER, DEFAULT_USER),
CONN_PARAM_PASSWORD: conf.get(CONN_PARAM_PASSWORD),
CONN_PARAM_CONN_TIMEOUT: conf.get(CONN_PARAM_CONN_TIMEOUT, DEFAULT_CONNECT_TIMEOUT),
'options': '-c statement_timeout={0}'.format(
conf.get(CONN_PARAM_STATEMENT_TIMEOUT, DEFAULT_STATEMENT_TIMEOUT)),
}
# https://www.postgresql.org/docs/current/libpq-ssl.html
ssl_params = dict(
(k, v) for k, v in {
CONN_PARAM_SSL_MODE: conf.get(CONN_PARAM_SSL_MODE),
CONN_PARAM_SSL_ROOT_CERT: conf.get(CONN_PARAM_SSL_ROOT_CERT),
CONN_PARAM_SSL_CRL: conf.get(CONN_PARAM_SSL_CRL),
CONN_PARAM_SSL_CERT: conf.get(CONN_PARAM_SSL_CERT),
CONN_PARAM_SSL_KEY: conf.get(CONN_PARAM_SSL_KEY),
}.items() if v)
if CONN_PARAM_SSL_MODE not in ssl_params and len(ssl_params) > 0:
raise ValueError("mandatory 'sslmode' param is missing, please set")
params.update(ssl_params)
return params
def connect(self):
if self.conn:
self.conn.close()
self.conn = None
try:
self.conn = psycopg2.connect(**self.conn_params)
self.conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
self.conn.set_session(readonly=True)
except OperationalError as error:
self.error(error)
self.alive = False
else:
self.alive = True
return self.alive
def check(self):
if not PSYCOPG2:
self.error("'python-psycopg2' package is needed to use postgres module")
return False
try:
self.conn_params = self.build_conn_params()
except ValueError as error:
self.error('error on creating connection params : {0}', error)
return False
if not self.connect():
self.error('failed to connect to {0}'.format(hide_password(self.conn_params)))
return False
return True
def get_data(self):
if not self.alive and not self.reconnect():
return None
self.data = dict()
try:
cursor = self.conn.cursor(cursor_factory=DictCursor)
cursor.execute(self.queries[0])
# Iterate through the cursor to get our records
for row in cursor:
# Create our chart line items.
for chart in ORDER:
dimension_id = '_'.join([row[0], chart])
if dimension_id not in self.charts[chart]:
self.charts[chart].add_dimension([dimension_id])
# Populate our dictionary based on the chart
if chart == 'replication_slot_time':
self.data[dimension_id] = row[1]
if chart == 'replication_slot_bytes':
self.data[dimension_id] = row[2]
cursor.close()
return self.data
except OperationalError:
self.alive = False
return None
def hide_password(config):
return dict((k, v if k != 'password' else '*****') for k, v in config.items())
Hey @ilyam8 , I decided to try to just merge my chart (along with a couple others) into the existing plugin. See: Added new postgres charts and updated standby charts to include slot_name if exists by mjtice · Pull Request #11241 · netdata/netdata · GitHub
@mjtice thanks for the PR
Monitoring Postgres Replication is exactly what I have been looking for.
I have only created simple port checks and alarms for our applications ( example /etc/netdata/python.d/portcheck.conf ), so I am a newbie.
But I do not understand how to implement the above. I did read Develop a custom data collector in Python | Learn Netdata but I can not see what directory I need to copy this file to https://github.com/netdata/netdata/blob/master/collectors/python.d.plugin/postgres/postgres.chart.py
I also do not know what/where its config file is. Any guidance would be appreciated.