PostgreSQL 流式处理应用实践 - 二手商品实时归类(异步消息notify/listen、阅后即焚)

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB 分布式版,标准版 2核8GB
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介:

标签

PostgreSQL , rule , trigger , 归类 , json , udf , 异步消息 , listen , notify


背景

因为二手商品没有太多的活动、硬性分类,广告等活动,所以购买或者销售速度没有新商品那么快。为了提高二手商品的销售效率,需要提供一套归类策略。

当商品新增或商品内容发生变化时,需要根据商品属性,以及定义的规则,实时进行商品归类(鱼塘,圈子等)方便用户查询。

结构设计

1、商品ID,属性

create table a (      
  id int8 primary key,   -- 商品ID      
  att jsonb   -- 商品属性      
);      

属性设计为JSON,JSON里面是K-V的属性对,V里面是数组,包含K的值以及这对属性的最后更新时间。

更新时间用于merge insert,当属性发生变化时才更新,没有发生变化时,不更新。

所以json需要遍历,并做合并处理。

合并JSON属性的UDF

create or replace function merge_json(jsonb, jsonb) returns jsonb as $$    
  select jsonb_object_agg(key,value) from (    
  select     
    coalesce(a.key, b.key) as key,     
    case     
    when     
    coalesce(jsonb_array_element(a.value,1)::text::timestamp, '1970-01-01'::timestamp)     
    >     
    coalesce(jsonb_array_element(b.value,1)::text::timestamp, '1970-01-01'::timestamp)     
    then a.value    
    else b.value    
    end    
  from jsonb_each($1) a full outer join jsonb_each($2) b using (key)    
  ) t;      
$$ language sql strict ;    
    
    
postgres=# select merge_json('{"price":[10000, "2018-01-01 10:10:11"], "newatt":[120, "2017-01-01 12:22:00"]}',  '{"price":[8880, "2018-01-04 10:10:12"], "count":[100, "2017-01-01 10:10:00"]}');    
                                                       merge_json                                                            
-------------------------------------------------------------------------------------------------------------------------    
 {"count": [100, "2017-01-01 10:10:00"], "price": [8880, "2018-01-04 10:10:12"], "newatt": [120, "2017-01-01 12:22:00"]}    
(1 row)    

触发器设计

触发器里面定义分类规则,例如这里对价格大于100的商品,吐出消息.

CREATE OR REPLACE FUNCTION notify1() returns trigger      
AS $function$      
declare      
begin      
  if jsonb_array_element(NEW.att->'price', 0)::text::float8 > 100 then   -- 规则1, 价格大于100,推送异步消息      
     perform pg_notify(      
       'a',    -- 异步消息通道名字      
       format('CLASS:high price, ID:%s, ATT:%s', NEW.id, NEW.att)   -- 消息内容      
     );      
  -- elsif ... then  其他规则      
  -- else  其他规则      
  end if;      
return null;      
end;      
$function$ language plpgsql strict;      

创建after insert or update触发器

create trigger tg1 after insert or update on a for each row execute procedure notify1();      

其他触发器(规则设计方法)

本文未使用

CREATE OR REPLACE FUNCTION notify1() returns trigger      
AS $function$      
declare      
begin      
  for key,value in select key, jsonb_array_element(value, 0)::text from jsonb_each(NEW.att)  -- 解析一次JSONB    
  loop    
    -- 规则处理    
    -- if key='price' then ...; end if;    
    -- if key='count' then ...; end if;    
  end loop;    
return null;    
end;    
$function$ language plpgsql strict;      
-- 动态规则表    
    
create table tbl_rule (    
  key text,  -- key值    
  exp text,  -- value 代入的表达式    
  class text,  -- 满足exp时,指向这个归类    
)    
    
CREATE OR REPLACE FUNCTION notify1() returns trigger      
AS $function$      
declare      
begin      
  for key,value in select key, jsonb_array_element(value, 0)::text from jsonb_each(NEW.att)  -- 解析一次JSONB    
  loop    
    -- 使用tbl_rule生成规则处理逻辑,动态    
  end loop;    
return null;    
end;    
$function$ language plpgsql strict;      

规则描述

json属性对中,value的类型可能很多,对应不同的规则语义。

1、文本 LIKE

2、数组 IN

3、等值

4、数值范围

5、时间范围

等等,在trigger的UDF中写规则即可。

数据合并写入测试

insert into a values       
  (1, '{"price":[10000, "2018-01-01 10:10:11"]}')       
  on conflict (id)       
  do update set       
  att = merge_json(a.att, excluded.att)  -- 合并新属性,保留老属性,需要使用一个UDF来合并      
  where       
  a.att <> merge_json(a.att, excluded.att);  -- 如果相等的概率很低,则可以去掉这个判断, 降低CPU开销   
    
    
postgres=# insert into a values    
  (1, '{"price":[1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}')    
  on conflict (id)    
  do update set    
  att = merge_json(a.att, excluded.att)  -- 合并新属性,保留老属性,需要使用一个UDF来合并    
  where    
  a.att <> merge_json(a.att, excluded.att);   -- 如果相等的概率很低,则可以去掉这个判断, 降低CPU开销  
INSERT 0 1    
    
    
postgres=# select * from a;    
 id |                                     att                                         
----+-----------------------------------------------------------------------------    
  1 | {"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}    
(1 row)    

监听消息

postgres=# listen a;      
LISTEN      
Asynchronous notification "a" with payload "ID:1, ATT:{"price": [10000, "2018-01-01 10:10:19"]}" received from server process with PID 51380.      

https://jdbc.postgresql.org/documentation/head/listennotify.html

其他

删除商品,可以使用DELETE触发器,告诉下游,比如商品已成交,删除。

CREATE OR REPLACE FUNCTION notify2() returns trigger      
AS $function$      
declare      
begin      
     perform pg_notify(      
       'a',                                                     -- 异步消息通道名字      
       format('CLASS:delete, ID:%s, ATT:%s', OLD.id, OLD.att)   -- 消息内容      
     );      
return null;      
end;      
$function$ language plpgsql strict;      
    
create trigger tg2 after delete on a for each row execute procedure notify2();      

方案二 - 流式批量消费

使用异步消息的方式,当连接中断时,重新连接后需要重新监听,并且在中断连接期间的消息会被丢弃掉。所以可靠性不佳。

另外,异步消息无法控制一次消费多少条,也不是特别友好。

所以我们实际上还有其他方法,持久化表,并且使用异步批量消费的方式进行消费。

性能指标:

CASE 数据量 并发 TPS 平均响应时间
流式处理 - 阅后即焚 - 消费 10亿,消费 395.2 万行/s 56 3952 14毫秒

结构沿用前面的例子,

1、新增一张结果表(也可以新增多张表,看业务量,通常一张够用了),

2、同时修改一下触发器内容,把notify改成写表,

3、修改客户端把监听通道改成异步消费SQL

DEMO

1、新增结果表

create table t_result(id serial8 primary key, class text, content text);    

2、触发器里面定义分类规则,例如这里对价格大于100的商品,吐出信息到结果表.

CREATE OR REPLACE FUNCTION notify1() returns trigger      
AS $function$      
declare      
begin      
  if jsonb_array_element(NEW.att->'price', 0)::text::float8 > 100 then   -- 规则1, 价格大于100,写入结果表      
     insert into t_result(class,content) values (    
       'a',    -- 归类    
       format('CLASS:high price, ID:%s, ATT:%s', NEW.id, NEW.att)   -- 消息内容      
     );    
  -- elsif ... then  其他规则      
  -- else  其他规则      
  end if;      
return null;      
end;      
$function$ language plpgsql strict;      

3、创建after insert or update触发器

create trigger tg1 after insert or update on a for each row execute procedure notify1();      

4、数据合并写入测试

insert into a values       
  (1, '{"price":[10000, "2018-01-01 10:10:11"]}')       
  on conflict (id)       
  do update set       
  att = merge_json(a.att, excluded.att)  -- 合并新属性,保留老属性,需要使用一个UDF来合并      
  where       
  a.att <> merge_json(a.att, excluded.att);   -- 如果相等的概率很低,则可以去掉这个判断, 降低CPU开销  
    
    
postgres=# insert into a values    
  (1, '{"price":[1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}')    
  on conflict (id)    
  do update set    
  att = merge_json(a.att, excluded.att)  -- 合并新属性,保留老属性,需要使用一个UDF来合并    
  where    
  a.att <> merge_json(a.att, excluded.att); -- 如果相等的概率很低,则可以去掉这个判断, 降低CPU开销   
    
INSERT 0 1    
    
postgres=# select * from a;    
 id |                                     att                                         
----+-----------------------------------------------------------------------------    
  1 | {"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}    
(1 row)    

5、异步批量消费结果表的内容(阅后即焚)

with a as (delete from t_result where ctid= any(array(     
  select ctid from t_result order by id limit 10 for update skip locked  -- 可以并发消费,不会相互堵塞,消费顺与写入顺序一致    
)) returning *)    
select * from a;    
 id | class |                                                 content                                                     
----+-------+---------------------------------------------------------------------------------------------------------    
  1 | a     | CLASS:high price, ID:1, ATT:{"price": [10000, "2018-01-01 10:10:11"]}    
  2 | a     | CLASS:high price, ID:1, ATT:{"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}    
(2 rows)    
    
    
原子操作,阅后即焚,再次查询已消费完毕    
    
postgres=# select * from t_result;    
 id | class | content     
----+-------+---------    
(0 rows)    

方案二续 - 使用statement级触发器代替row级触发器

为什么建议使用statement级触发器代替row级触发器,参考:

《PostgreSQL 批量、单步 写入 - row, statement 触发器(中间表)、CTE 几种用法性能对比》

触发器函数修改如下

CREATE OR REPLACE FUNCTION notify1() returns trigger        
AS $function$        
declare        
begin        
  -- 规则1  
  insert into t_result(class,content) select   
    'a',    -- 归类    
    format('CLASS:high price, ID:%s, ATT:%s', id, att)   -- 消息内容     
  from new_table   
  where jsonb_array_element(att->'price', 0)::text::float8 > 100;    -- 规则1, 价格大于100,写入结果表    
    
  -- 其他规则  
  -- insert into t_result(class,content) select   
  -- ......  
  --   from new_table   
  -- where ...  -- 规则n  
    
  return null;        
end;        
$function$ language plpgsql strict;      

触发器修改如下

create trigger tg1 after insert on a REFERENCING NEW TABLE AS new_table for each STATEMENT execute procedure notify1();      
create trigger tg2 after update on a REFERENCING NEW TABLE AS new_table for each STATEMENT execute procedure notify1();      
postgres=# \d a  
                 Table "public.a"  
 Column |  Type   | Collation | Nullable | Default   
--------+---------+-----------+----------+---------  
 id     | integer |           | not null |   
 att    | jsonb   |           |          |   
Indexes:  
    "pk" PRIMARY KEY, btree (id)  
Triggers:  
    tg1 AFTER INSERT ON a REFERENCING NEW TABLE AS new_table FOR EACH STATEMENT EXECUTE PROCEDURE notify1()  
    tg2 AFTER UPDATE ON a REFERENCING NEW TABLE AS new_table FOR EACH STATEMENT EXECUTE PROCEDURE notify1()  

小结

使用异步消息,UDF,规则或触发器,非常轻量化的解决了实时计算的问题。

但是,异步消息是可能丢消息的,例如监听连接中断后,重连时,需要重新发起监听,并且中断连接时未消费的消息,不会再被消费,所以相当于丢消息了。

改进方法:

1、如果要保证不丢消息,可以将notify改成INSERT,把结果写入预先定义好的某个结果表,使用逻辑DECODE的方式,解析这个结果表相关的logical decode信息,从而获取变化量,参考如下。

《PostgreSQL pg_recvlogical 与 test_decoding 自定义,支持source table filter, 对接kafka,es等》

2、使用阅后即焚的方法,类似本方案2.

《阿里云RDS PostgreSQL varbitx实践 - 流式标签 (阅后即焚流式批量计算) - 万亿级,任意标签圈人,毫秒响应》

《HTAP数据库 PostgreSQL 场景与性能测试之 32 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(JSON + 函数流式计算)》

《HTAP数据库 PostgreSQL 场景与性能测试之 31 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(读写大吞吐并测)》

《HTAP数据库 PostgreSQL 场景与性能测试之 27 - (OLTP) 物联网 - FEED日志, 流式处理 与 阅后即焚 (CTE)》

《在PostgreSQL中实现update | delete limit - CTID扫描实践 (高效阅后即焚)》

参考

https://www.postgresql.org/docs/11/static/functions-json.html

https://www.postgresql.org/docs/11/static/datatype-json.html

https://jdbc.postgresql.org/documentation/head/listennotify.html

https://www.postgresql.org/docs/11/static/sql-notify.html

https://www.postgresql.org/docs/11/static/sql-listen.html

https://www.postgresql.org/docs/11/static/sql-unlisten.html

https://www.postgresql.org/docs/11/static/libpq-notify.html

https://www.postgresql.org/docs/11/static/sql-notify.html#id-1.9.3.157.7.5

https://www.postgresql.org/docs/11/static/functions-info.html

https://www.postgresql.org/docs/11/static/plpgsql-trigger.html

https://github.com/impossibl/pgjdbc-ng

https://www.openmakesoftware.com/postgresql-listen-notify-events-example/

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
8月前
|
关系型数据库 物联网 PostgreSQL
沉浸式学习PostgreSQL|PolarDB 11: 物联网(IoT)、监控系统、应用日志、用户行为记录等场景 - 时序数据高吞吐存取分析
物联网场景, 通常有大量的传感器(例如水质监控、气象监测、新能源汽车上的大量传感器)不断探测最新数据并上报到数据库. 监控系统, 通常也会有采集程序不断的读取被监控指标(例如CPU、网络数据包转发、磁盘的IOPS和BW占用情况、内存的使用率等等), 同时将监控数据上报到数据库. 应用日志、用户行为日志, 也就有同样的特征, 不断产生并上报到数据库. 以上数据具有时序特征, 对数据库的关键能力要求如下: 数据高速写入 高速按时间区间读取和分析, 目的是发现异常, 分析规律. 尽量节省存储空间
616 1
|
8月前
|
人工智能 关系型数据库 Serverless
阿里函数计算FC、文件存储NAS和RDS PostgreSQL的应用体验报告
本次体验的目的,旨在详细介绍如何通过阿里函数计算FC部署ChatGLM6B大语言模型,并借助文件存储NAS和RDS PostgreSQL搭建一个AI知识库问答应用,以实现PDF、TXT、HTML等文件和URL类型资料的轻松读取和处理。
246 62
|
5月前
|
SQL 关系型数据库 C语言
PostgreSQL【应用 03】Docker部署的PostgreSQL扩展SQL之C语言函数(编写、编译、载入)计算向量余弦距离实例分享
PostgreSQL【应用 03】Docker部署的PostgreSQL扩展SQL之C语言函数(编写、编译、载入)计算向量余弦距离实例分享
49 0
|
5月前
|
SQL 关系型数据库 数据库
PostgreSQL【应用 02】扩展SQL之C语言函数(编写、编译、载入)实例分享
PostgreSQL【应用 02】扩展SQL之C语言函数(编写、编译、载入)实例分享
55 0
|
5月前
|
关系型数据库 数据库 PostgreSQL
PostgreSQL【应用 01】使用Vector插件实现向量相似度查询(Docker部署的PostgreSQL安装pgvector插件说明)和Milvus向量库对比
PostgreSQL【应用 01】使用Vector插件实现向量相似度查询(Docker部署的PostgreSQL安装pgvector插件说明)和Milvus向量库对比
209 1
|
5月前
|
关系型数据库 数据库 PostgreSQL
Docker【应用 03】给Docker部署的PostgreSQL数据库安装PostGIS插件(安装流程及问题说明)
Docker【应用 03】给Docker部署的PostgreSQL数据库安装PostGIS插件(安装流程及问题说明)
163 0
|
7月前
|
存储 关系型数据库 数据库
《PostgreSQL物化视图:创建、维护与应用》
《PostgreSQL物化视图:创建、维护与应用》
50 0
|
7月前
|
存储 JSON 关系型数据库
《PostgreSQL中的JSON处理:技巧与应用》
《PostgreSQL中的JSON处理:技巧与应用》
61 0
|
8月前
|
人工智能 关系型数据库 Serverless
探索AI知识库问答应用:函数计算与RDS PostgreSQL的奇妙融合
随着技术的飞速发展,AI大语言模型成为了当今科技界的一颗璀璨明星。我有幸跟随老陈的引导,踏入了基于函数计算(FC)和RDS PostgreSQL的AI知识库问答应用的世界。这次的探索让我深切感受到了云计算和人工智能的结合,以及它们如何塑造着未来的技术格局。
192 0
|
8月前
|
关系型数据库 分布式数据库 数据库
沉浸式学习PostgreSQL|PolarDB 8: 电商|短视频|新闻|内容推荐业务(根据用户行为推荐相似内容)、监控预测报警系统(基于相似指标预判告警)、音视图文多媒体相似搜索、人脸|指纹识别|比对 - 向量搜索应用
1、在电商业务中, 用户浏览商品的行为会构成一组用户在某个时间段的特征, 这个特征可以用向量来表达(多维浮点数组), 同时商品、店铺也可以用向量来表达它的特征. 那么为了提升用户的浏览体验(快速找到用户想要购买的商品), 可以根据用户向量在商品和店铺向量中进行相似度匹配搜索. 按相似度来推荐商品和店铺给用户. 2、在短视频业务中, 用户浏览视频的行为, 构成了这个用户在某个时间段的兴趣特征, 这个特征可以用向量来表达(多维浮点数组), 同时短视频也可以用向量来表达它的特征. 那么为了提升用户的观感体验(推荐他想看的视频), 可以在短视频向量中进行与用户特征向量的相似度搜索.
234 0

相关产品

  • 云原生数据库 PolarDB
  • http://www.vxiaotou.com