Netdata Community

Thoughts on postgres replication lag

I wonder if anyone has experience or thoughts on how to go about monitoring Postgres replication lag?

1 Like

Hey @mjtice ,

The best way, I think, is to help us make the collector even better:

We could add another query that queries postgresql for that information and then make the proper modifications. to create the charts.

We have a whole guide on how to create a python collector, but this should be much easier, since we will simply extend the existing collector.

What do you think?We will help you along the way!

Hey @OdysLam , I’ve written a few postgres plugins for use internally (and I was actually just starting on this plugin this morning).

I think I’m going to take the approach the authors of the Nagios check_postgres plugin did for their replicate_row in that it ensures that replication is actually functioning (vs. looking at some internal counters, assuming replication is running).

@mjtice,

At netdata we are in the process of moving all our collectors to Golang. If you are up to it, we can start working on a Golang PostgreSQL collector.

The reason we want Golang is that it’s a better user experience for the user, since they don’t have to install a middleware (python), just use the binary.

We have a sort of guide for writing collectors in Golang and @ilyam8 would be able to help you :v:

To get started, take a look at the contributing handbook (it’s very detailed) and the Golang guide:

Interesting. @OdysLam is the plan to deprecate and remove other non-golang collectors or is it just that official collectors will be golang?

1 Like

Although not final, we will probably phase out most non-golang collectors. That means that we will still offer them through community repos, so that people can contribute easily, but we will not be responsible for maintaining them or providing support.

They will be offered “as-is”.

That being said, postgresql is a very core collector, so it definitely will be implemented in Golang. Thus, if you start working on this, you can be certain that your contribution will become core part of the OSS Netdata Agent.

We accept improvements of the python Postgres collector (since we have no it rewritten in go).

@mjtice check Adding more postgres metrics PR, it adds the following charts:

  • Standby delta
  • Standby lag
  • Average number of blocking transactions in db

I think that is related to your request.


And there are more replication metrics coming.

Thanks for the info, @ilyam8. I’m glad you’re keeping python-based collectors around. I can write those with some ease (after a lot of practice (albeit they’re not super pretty)).

Looking at the PR, unfortunately the query doesn’t fit what we have in place now (e.g.)

SELECT
    application_name,
    COALESCE(EXTRACT(EPOCH FROM write_lag)::bigint, 0) AS write_lag,
    COALESCE(EXTRACT(EPOCH FROM flush_lag)::bigint, 0) AS flush_lag,
    COALESCE(EXTRACT(EPOCH FROM replay_lag)::bigint, 0) AS replay_lag
FROM pg_stat_replication
WHERE application_name IS NOT NULL;
    application_name    | write_lag | flush_lag | replay_lag
------------------------+-----------+-----------+------------
 walreceiver            |         0 |         0 |          0
 walreceiver            |         0 |         0 |          0
 walreceiver            |         0 |         0 |          0
 PostgreSQL JDBC Driver |         7 |        19 |         19
 PostgreSQL JDBC Driver |         2 |        12 |         12
 PostgreSQL JDBC Driver |         1 |         1 |          1

I’ve written another plugin to check our replication_slots (borrowed heavily from the original postgres plugin).

# -*- coding: utf-8 -*-
# Description: example netdata python.d module
# Authors: facetoe, dangtranhoang
# SPDX-License-Identifier: GPL-3.0-or-later

try:
    import psycopg2
    from psycopg2 import extensions
    from psycopg2.extras import DictCursor
    from psycopg2 import OperationalError

    PSYCOPG2 = True
except ImportError:
    PSYCOPG2 = False

from bases.FrameworkServices.SimpleService import SimpleService

# Global variables used for connection construction.
DEFAULT_PORT = 5432
DEFAULT_USER = 'postgres'
DEFAULT_CONNECT_TIMEOUT = 2  # seconds
DEFAULT_STATEMENT_TIMEOUT = 5000  # ms

CONN_PARAM_DSN = 'dsn'
CONN_PARAM_HOST = 'host'
CONN_PARAM_PORT = 'port'
CONN_PARAM_DATABASE = 'database'
CONN_PARAM_USER = 'user'
CONN_PARAM_PASSWORD = 'password'
CONN_PARAM_CONN_TIMEOUT = 'connect_timeout'
CONN_PARAM_STATEMENT_TIMEOUT = 'statement_timeout'
CONN_PARAM_SSL_MODE = 'sslmode'
CONN_PARAM_SSL_ROOT_CERT = 'sslrootcert'
CONN_PARAM_SSL_CRL = 'sslcrl'
CONN_PARAM_SSL_CERT = 'sslcert'
CONN_PARAM_SSL_KEY = 'sslkey'
# End global variables

# Global variables required by python.d plugin.
ORDER = [
    'replication_slot_time',
    'replication_slot_bytes'
]

CHARTS = {
    'replication_slot_time': {
        'options': [None, 'seconds behind', 'seconds', 'replication_slot lag time', 'postgres.replication_slot_lag_time', 'line'],
        'lines': [
        ]
    },
    'replication_slot_bytes': {
        'options': [None, 'bytes behind', 'bytes', 'replication_slot lag bytes', 'postgres.replication_slot_lag_bytes', 'line'],
        'lines': [
        ]
    }
}
# End global variables

# Global variable declaring our queries
QUERIES = {
    0: """
        select
          CASE
           WHEN pg_replication_slots.slot_type = 'physical' THEN pg_replication_slots.slot_name
           WHEN pg_replication_slots.slot_type = 'logical' THEN pg_replication_slots.database
          END                                 as db_name,
          (EXTRACT(hour FROM pg_stat_replication.replay_lag) * 60 * 60 +
           +EXTRACT(minutes FROM pg_stat_replication.replay_lag) * 60
           + EXTRACT(seconds FROM pg_stat_replication.replay_lag)) :: int as seconds_behind,
          redo_lsn - restart_lsn AS bytes_behind
        FROM pg_stat_replication, pg_replication_slots, pg_control_checkpoint()
        where pg_stat_replication.pid = pg_replication_slots.active_pid
            """
}
# End global variables

class Service(SimpleService):
    def __init__(self, configuration=None, name=None):
        SimpleService.__init__(self, configuration=configuration, name=name)
        self.order = ORDER
        self.definitions = CHARTS
        self.queries = QUERIES
        self.configuration = configuration
        self.conn = None
        self.conn_params = dict()
        self.alive = False
        self.replication_slots = list()
        self.data = dict()

    def reconnect(self):
        return self.connect()

    def build_conn_params(self):
        conf = self.configuration

        # connection URIs: https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
        if conf.get(CONN_PARAM_DSN):
            return {'dsn': conf[CONN_PARAM_DSN]}

        params = {
            CONN_PARAM_HOST: conf.get(CONN_PARAM_HOST),
            CONN_PARAM_PORT: conf.get(CONN_PARAM_PORT, DEFAULT_PORT),
            CONN_PARAM_DATABASE: conf.get(CONN_PARAM_DATABASE),
            CONN_PARAM_USER: conf.get(CONN_PARAM_USER, DEFAULT_USER),
            CONN_PARAM_PASSWORD: conf.get(CONN_PARAM_PASSWORD),
            CONN_PARAM_CONN_TIMEOUT: conf.get(CONN_PARAM_CONN_TIMEOUT, DEFAULT_CONNECT_TIMEOUT),
            'options': '-c statement_timeout={0}'.format(
                conf.get(CONN_PARAM_STATEMENT_TIMEOUT, DEFAULT_STATEMENT_TIMEOUT)),
        }

        # https://www.postgresql.org/docs/current/libpq-ssl.html
        ssl_params = dict(
            (k, v) for k, v in {
                CONN_PARAM_SSL_MODE: conf.get(CONN_PARAM_SSL_MODE),
                CONN_PARAM_SSL_ROOT_CERT: conf.get(CONN_PARAM_SSL_ROOT_CERT),
                CONN_PARAM_SSL_CRL: conf.get(CONN_PARAM_SSL_CRL),
                CONN_PARAM_SSL_CERT: conf.get(CONN_PARAM_SSL_CERT),
                CONN_PARAM_SSL_KEY: conf.get(CONN_PARAM_SSL_KEY),
            }.items() if v)

        if CONN_PARAM_SSL_MODE not in ssl_params and len(ssl_params) > 0:
            raise ValueError("mandatory 'sslmode' param is missing, please set")

        params.update(ssl_params)

        return params

    def connect(self):
        if self.conn:
            self.conn.close()
            self.conn = None

        try:
            self.conn = psycopg2.connect(**self.conn_params)
            self.conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
            self.conn.set_session(readonly=True)
        except OperationalError as error:
            self.error(error)
            self.alive = False
        else:
            self.alive = True

        return self.alive

    def check(self):
        if not PSYCOPG2:
            self.error("'python-psycopg2' package is needed to use postgres module")
            return False

        try:
            self.conn_params = self.build_conn_params()
        except ValueError as error:
            self.error('error on creating connection params : {0}', error)
            return False

        if not self.connect():
            self.error('failed to connect to {0}'.format(hide_password(self.conn_params)))
            return False

        return True

    def get_data(self):
        if not self.alive and not self.reconnect():
            return None

        self.data = dict()
        try:
            cursor = self.conn.cursor(cursor_factory=DictCursor)
            cursor.execute(self.queries[0])

            # Iterate through the cursor to get our records
            for row in cursor:

                # Create our chart line items.
                for chart in ORDER:
                    dimension_id = '_'.join([row[0], chart])
                    if dimension_id not in self.charts[chart]:
                        self.charts[chart].add_dimension([dimension_id])

                    # Populate our dictionary based on the chart
                    if chart == 'replication_slot_time':
                        self.data[dimension_id] = row[1]
                    if chart == 'replication_slot_bytes':
                        self.data[dimension_id] = row[2]

            cursor.close()
            return self.data
        except OperationalError:
            self.alive = False
            return None

def hide_password(config):
    return dict((k, v if k != 'password' else '*****') for k, v in config.items())


Hey @ilyam8 , I decided to try to just merge my chart (along with a couple others) into the existing plugin. See: Added new postgres charts and updated standby charts to include slot_name if exists by mjtice · Pull Request #11241 · netdata/netdata · GitHub

@mjtice thanks for the PR :+1:

Monitoring Postgres Replication is exactly what I have been looking for.
I have only created simple port checks and alarms for our applications ( example /etc/netdata/python.d/portcheck.conf ), so I am a newbie.

But I do not understand how to implement the above. I did read Develop a custom data collector in Python | Learn Netdata but I can not see what directory I need to copy this file to netdata/postgres.chart.py at master · netdata/netdata · GitHub
I also do not know what/where its config file is. Any guidance would be appreciated.