K
K
ksimmi2020-09-02 13:59:54
PostgreSQL
ksimmi, 2020-09-02 13:59:54

How good/optimal is it to use a stored procedure for polling?

Hello!

More than a month ago, I solved the problem described in another question , but, unfortunately, I did not find the solution I wanted, namely "hang a trigger on date aging and do not use selects". The polling logic is described in detail at the link in the question, and I will not duplicate the task here. In the same question, in the UPD section, I indicated that I see another solution, through a stored procedure with sequencing, and I went down this path. I managed to do what I wanted in one weekend, but this is my first experience of writing stored procedures and I have no confidence that I did everything right and in general it will work quickly.

Given:

  • Three queue tables with entries to poll: second_intervals_polling, minute_intervals_polling, hour_intervals_polling. The tables have an identical structure, the same indexes, all that distinguishes them is the frequency of selection from these tables, i.e. spacing between selects. In these tables, I make full use of the INTERVAL and ARRAY data types. ;
  • Table of dequeued entries terminated. Everything is clear here, here I add all the records that were successfully brought to the terminal status, deleting them from the three previous tables;
  • Table with basic queue settings for each type of poller. This table is used at the application level to read the settings before queuing the next record and will not appear in the description further;
  • Three stored functions for getting entries with an expired timestamp from its corresponding table poll_second_intervals, poll_minute_intervals, poll_hour_intervals. I will give the function code below, but the logic is not complicated. Each function selects records from the associated table for which it is time to request the status, and for all such records, if any, the next interval is calculated based on the current iteration. If, for example, the current table is calculated in second intervals, and the calculated interval is in minutes, then the record will be transferred from the second table to the minute one. Ultimately, the function returns a list of strings for which it is time to poll the status;
  • One stored function to dequeue polling terminate_polling;
  • One stored function to get the last element of an array, added just for the semantics of array_last.


How it's used:
At the application level, depending on the type of poller (more precisely, on the requirements for polling frequency), a new record is created in one of the three tables second_intervals_polling, minute_intervals_polling, hour_intervals_polling. Let's say the entry was written to second_intervals_polling. There is a schedule (for example, kroner), which, through the application, calls the poll_second_intervals function once a second. As a result of this function, the application receives all the data that it is time to poll, and at the level of the record database, an iteration number is assigned, and based on the interval of this iteration (intervals[iteration]), the next date next_poll_at is calculated. When the poll_second_intervals function is called again, the iteration will be incremented, and again the next date next_poll_at will be calculated based on the interval (intervals[iteration + 1]). If the interval for the current iteration is not calculated in seconds (more than 59 seconds), then the record will be moved to another table minute_intervals_polling. For the data in the minute_intervals_polling table, exactly the same logic is valid as for second_intervals_polling, only the schedule for calling the poll_minute_intervals function associated with it is called once a minute, respectively, and records with an interval that go beyond the allowable range of intervals are transferred to the next. the hour_intervals_polling table, where everything starts anew. After receiving the terminal status, the terminate_polling function is called, which removes a record from any of the three tables and saves them to the terminated table. For the data in the minute_intervals_polling table, exactly the same logic is valid as for second_intervals_polling, only the schedule for calling the poll_minute_intervals function associated with it is called once a minute, respectively, and records with an interval that go beyond the allowable range of intervals are transferred to the next. the hour_intervals_polling table, where everything starts anew. After receiving the terminal status, the terminate_polling function is called, which removes a record from any of the three tables and saves them to the terminated table. For the data in the minute_intervals_polling table, exactly the same logic is valid as for second_intervals_polling, only the schedule for calling the poll_minute_intervals function associated with it is called once a minute, respectively, and records with an interval that go beyond the allowable range of intervals are transferred to the next. the hour_intervals_polling table, where everything starts anew. After receiving the terminal status, the terminate_polling function is called, which removes a record from any of the three tables and saves them to the terminated table. where everything starts anew. After receiving the terminal status, the terminate_polling function is called, which removes a record from any of the three tables and saves them to the terminated table. where everything starts anew. After receiving the terminal status, the terminate_polling function is called, which removes a record from any of the three tables and saves them to the terminated table.

The structure of the queue tables using second_intervals_polling as an example:
create table second_intervals_polling
(
    id                 uuid default gen_random_uuid() not null constraint second_intervals_polling_pkey primary key,
    poller_type        varchar                             not null,
    created_at         timestamp default now()             not null,
    next_poll_at       timestamp                           not null,
    iteration          integer   default 0                 not null,
    iteration_interval interval                            not null,
    intervals          interval[]                          not null,
    technical_name     varchar                             not null,
    pollable           jsonb
);


Function code:
CREATE OR REPLACE FUNCTION array_last(arr anyarray) RETURNS anyelement AS $$
    SELECT arr[array_upper(arr, 1)];
$$ LANGUAGE sql;

CREATE OR REPLACE FUNCTION poll_second_intervals() RETURNS SETOF second_intervals_polling AS $$
WITH 
    deleted AS (
        DELETE FROM second_intervals_polling
        WHERE iteration_interval >= '1 MINUTE'
        RETURNING  *
    ),
    moved AS (
        INSERT INTO minute_intervals_polling SELECT * FROM deleted RETURNING *
    ),
    updated AS (
        UPDATE second_intervals_polling
        SET iteration = iteration + 1,
            next_poll_at = NOW() + iteration_interval,
            iteration_interval = COALESCE(intervals[iteration + 1], iteration_interval)
        WHERE second_intervals_polling.id NOT IN (SELECT id FROM moved)
          AND next_poll_at <= NOW()
        RETURNING *
    )
    SELECT * FROM updated;
$$ LANGUAGE sql;


CREATE OR REPLACE FUNCTION poll_minute_intervals() RETURNS SETOF minute_intervals_polling AS $$
WITH
    deleted AS (
        DELETE FROM minute_intervals_polling
        WHERE iteration_interval >= '1 HOUR'
        RETURNING  *
    ),
    moved AS (
        INSERT INTO hour_intervals_polling SELECT * FROM deleted RETURNING *
    ),
    updated AS (
        UPDATE minute_intervals_polling
        SET iteration = iteration + 1,
            next_poll_at = NOW() + iteration_interval,
            iteration_interval = COALESCE(intervals[iteration + 1], iteration_interval)
        WHERE minute_intervals_polling.id NOT IN (SELECT id FROM moved)
          AND next_poll_at <= NOW()
        RETURNING *
    )
    SELECT * FROM updated;
$$ LANGUAGE sql;


CREATE OR REPLACE FUNCTION poll_hour_intervals() RETURNS SETOF hour_intervals_polling AS $$
WITH
    updated  AS (
        UPDATE hour_intervals_polling
        SET iteration = iteration + 1,
            next_poll_at = NOW() + COALESCE(intervals[iteration + 1], array_last(intervals))
        WHERE next_poll_at <= NOW()
        RETURNING *
    )
    SELECT * FROM updated;
$$ LANGUAGE sql;


CREATE OR REPLACE FUNCTION terminate_polling(ptype varchar, pkey jsonb)
  RETURNS TABLE(
      id uuid,
      poller_type varchar,
      created_at timestamp,
      terminated_at timestamp,
      iteration integer,
      intervals interval[],
      technical_name varchar,
      pollable jsonb
  ) AS $$
WITH deleted_per_seconds AS ( DELETE FROM second_intervals_polling WHERE pkey = pollable AND ptype = poller_type RETURNING  * ),
     deleted_per_minutes AS ( DELETE FROM minute_intervals_polling WHERE pkey = pollable AND ptype = poller_type RETURNING  * ),
     deleted_per_hours   AS ( DELETE FROM hour_intervals_polling   WHERE pkey = pollable AND ptype = poller_type RETURNING  * ),
     moved AS (
          INSERT INTO terminated
               ( id, created_at, poller_type, intervals, iteration, technical_name, pollable )
          SELECT id, created_at, poller_type, intervals, iteration, technical_name, pollable FROM deleted_per_seconds UNION
          SELECT id, created_at, poller_type, intervals, iteration, technical_name, pollable FROM deleted_per_minutes UNION
          SELECT id, created_at, poller_type, intervals, iteration, technical_name, pollable FROM deleted_per_hours
          RETURNING *
     )
SELECT * FROM moved;
$$ LANGUAGE sql;


Tested locally and on a test bench. Works as expected, but maybe I made a mistake somewhere, because. my first experience. Now I am most interested in the assessment of people who are more knowledgeable, in other words, I need criticism. Are there obvious disadvantages?

PS I divided it into three tables due to the fact that according to the requirements it really needs to be selected every second and I do not want the "non-urgent" data to lie together with the "urgent", it seems to me that this should work faster.

Answer the question

In order to leave comments, you need to log in

1 answer(s)
R
rPman, 2020-09-06
@rPman

Initially, the wrong approach to implementation, which will have glitches in problem areas. In addition, the option you have turned out to be too complicated.
If the database basically does not allow the functionality you need, no matter how perverted you are, you will have to do it outside. And it's better that it be 'userspace' and not the database engine or its extension, since the cost of maintaining the result grows exponentially.
The correct approach is that your application (not cron) is running, which, at the time of changing / adding time intervals, will calculate what next interval should be triggered in the near future, wait for the appropriate time and execute it.
In the database (task table), depending on the need, you should have the following fields:
* interval creation timestamp (needed to calculate how many times the interval should be executed and already completed)
Already at this moment an important remark appears, you need to understand that the time on the machine can be changed, adjusted back, for example, and depending on how important it is for you not to miss or not to get an extra performance, you can use synthetic values, incl. use special processor counters, the value of which does not change when the time changes, of course, you will have to add adjustment / initialization tools when the service is restarted, a separate conversation and task.
* time interval between repetitions
* limit of the number of executions - here you can specify the maximum number of times the task is executed
* number of executions - the counter should increase when the task is executed
, you also need to take into account that there are just execution attempts and there are successfully completed ones, there are tasks in which unsuccessful attempts should also be taken into account, since system error control is important.
All interval changes must go through your service, or you must collect database notifications about such changes (some databases provide such a tool, allow events to be transmitted through individual sockets or even a client connection session), although you will have to consider that the records in the database are transactional and if suddenly the interval record will be rolled back and you have already processed the event (or vice versa, the transaction went through in the database and the event was not processed due to an error), so it’s easier to manage intervals through your application.
At the moment when the application receives a new situation by intervals, it calculates what is the nearest interval and when it will end, and starts a time counter, which should be canceled when new changes are received.
At the end of the counter, you must again request from the database a list of tasks that are late or must be executed now, calculating the difference between the estimated number of executions (now - creation_time)/exec_interval and the execution counter (we immediately check the limit for the number of runs, and if it is exceeded, the task delete, not forgetting to complete the required number of starts). For each task, we get the number of executions - we launch these tasks and at the end of each iteration we increase the execution counter.
ps division of tasks into different tables depending on the length of the interval will not affect performance in any way, they will only complicate the algorithm

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question