道之所向,吾之所往

0%

SpringBoot redis系列 -布隆过滤器(4)

背景

给用户推荐文章阅读,需要知道用户有没有看过该文章,看过了不需要再次推荐,如何去去重呢,TyperLogLog是可以做到去重,但是它无法知道某个用户是否看过了,如果把看过的用户和文章写到一个Set集合,那随着业务增长,带来的内存开销是巨大的,这个时候布隆过滤器的作用就出来了。它就是用来处理这种去重问题的,它去重的同时还能节约90%的空间,只是它不是那么精准,存在一点误差,像推荐的业务,我们平时刷抖音,今日头条也会碰到推荐了重复的,所以我猜测他们的推荐可能也是基于布隆过滤器。

ps:不同的数据结构有不同的适用场景和优缺点,你需要仔细权衡自己的需求之后妥善适用它们,布隆过滤器就是践行这句话的代表。如果知道业务的走向,无法到达膨大的用户量,其实也没必要使用复杂的结构

使用场景

  • 推荐去重
  • 爬虫url地址去重
  • 垃圾邮件过滤

9150e4e5ly1flnj4ymahcg208c08cdfx.gif

正文

布隆过滤器简介

布隆过滤器可以理解为一个不怎么精准的Set集合,使用contains 方法判断某个对象是否存在,可能会存在误判的情况,但是只要参数设置合理,精确度可以控制到相对足够精确,只会有小小的误判。

布隆过滤器只会误判没有见过的元素,不会误判见过的元素,因为可能跟它见过的元素比较像。

Redis中布隆过滤器在Redis4.0提供了插件功能之后才正式出现,要安装布隆过滤器插件才可以用,直接用docker装了一个,并且给他设置密码,我开始没有设置密码,结果第二天就中了挖矿程序,建议大家服务器上的Redis对公网开放端口的设置一个密码

1
docker run -p 6379:6379 --name redis-redisbloom  redislabs/rebloom:latest

布隆过滤器基本指令:

  • bf.add 添加元素

  • bf.exists元素是否存在

  • bf.madd 添加多个元素

  • bf.mexists 多个元素是否存在

    image-20200107180904561.png

    返回1代表存在 0代表不存在

使用bf.add 命令会创建一个默认的布隆过滤器,它的精确度可能不是那么好,我们通过调参使它的精确度提高。使用bf.reserve在add之前创建,它有三个参数,分别是key, error_rateinitial_size,错误率error_rate越低,需要的空间越大,initial_size表示预计放入的元素数量,当超出这个数量时,误判率会上升。默认值error_rate0.01 initial_size100

注意:initial_size过大会浪费内存,过大会正确率变低,在使用之前一定要估计用户量,加上一定的冗余空间避免低估。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33



**原理简介:**

布隆过滤器组成就是一个大型的位数组和几个不一样的无偏Hash函数,无偏就是能把hash算的比较均匀。

hash算法是可能不同key重复的,所以采用多个hash函数。添加key时,会用多个Hash函数对key进行Hash计算一个整数索引,多个hash函数就会有多个索引,把这个这些索引上的值都设置成1,判断是否存在时,判断这些索引上的值是否都是1,如果都是1,就认为存在,这样避免了Hash重复。







### SpringBoot 中使用布隆过滤器

找了下文档,似乎SpringBoot Data Redis 并未对布隆过滤器支持,参考Redis官网的介绍,我们给其增加对布隆过滤器的支持。

redis 官方布隆过滤器介绍:

https://oss.redislabs.com/redisbloom/

redis bloom java sdk:

https://github.com/RedisBloom/JRedisBloom

我们先观察下Redis **RedisTemplate.opsForValue()** 的操作String实现,模仿它写一个布隆过滤器操作,进入方法内部。

```java
public ValueOperations<K, V> opsForValue() {
return this.valueOps;
}

ValueOperations 是一个接口,定义了操作string的一些常用命令,如 set get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ValueOperations<K, V> {
void set(K var1, V var2);

void set(K var1, V var2, long var3, TimeUnit var5);

default void set(K key, V value, Duration timeout) {
Assert.notNull(timeout, "Timeout must not be null!");
if (TimeoutUtils.hasMillis(timeout)) {
this.set(key, value, timeout.toMillis(), TimeUnit.MILLISECONDS);
} else {
this.set(key, value, timeout.getSeconds(), TimeUnit.SECONDS);
}

}
}

我们顺腾摸瓜,看看它的实现,有个默认的实现DefaultValueOperations,可以看到它集成于AbstractOperations抽象类,提供了一系列方法,如获取RedisTemplate配置,序列化key,value等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package org.springframework.data.redis.core;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.redis.connection.DefaultTuple;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisGeoCommands.GeoLocation;
import org.springframework.data.redis.connection.RedisZSetCommands.Tuple;
import org.springframework.data.redis.connection.convert.Converters;
import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

abstract class AbstractOperations<K, V> {


abstract class ValueDeserializingRedisCallback implements RedisCallback<V> {
private Object key;

public ValueDeserializingRedisCallback(Object key) {
this.key = key;
}

/**
* 抽象回调接口 拿到连接 RedisConnection 执行命令
*/
public final V doInRedis(RedisConnection connection) {
byte[] result = this.inRedis(AbstractOperations.this.rawKey(this.key), connection);
return AbstractOperations.this.deserializeValue(result);
}

@Nullable
protected abstract byte[] inRedis(byte[] var1, RedisConnection var2);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
class DefaultValueOperations<K, V> extends AbstractOperations<K, V> implements ValueOperations<K, V> {
DefaultValueOperations(RedisTemplate<K, V> template) {
super(template);
}

public V get(Object key) {
return this.execute(new AbstractOperations<K, V>.ValueDeserializingRedisCallback(key) {
protected byte[] inRedis(byte[] rawKey, RedisConnection connection) {
return connection.get(rawKey);
}
}, true);
}

追了一下get方法,execute继承于AbstractOperations,调用了RedisTemplate的方法,并传入了回调类,这个方法主要就是根据配置获取一个RedisConnection 实例执行命令。

1
2
3
4
@Nullable
<T> T execute(RedisCallback<T> callback, boolean exposeConnection) {
return this.template.execute(callback, exposeConnection);
}

拿到回调之后就开始执行命令了, connection.set 方法就是在执行指令,我们追源码发现RedisConnection继承于RedisCommands,它提供了执行命令的操作。就是它了,用它来执行就命令就ok了。具体Redistemplate原理可以参考:https://blog.csdn.net/junchenbb0430/article/details/51104335

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected byte[] inRedis(byte[] rawKey, RedisConnection connection) {
connection.set(rawKey, rawValue);
return null;
}

public interface RedisConnection extends RedisCommands {
default RedisGeoCommands geoCommands() {
return this;
}


public interface RedisCommands extends RedisKeyCommands,...{
@Nullable
Object execute(String var1, byte[]... var2);
}

我们根据套路,先创建BloomOperations定义操作方法,DefaultBloomOperations实现,封装一个常量存放关键字命令。注意:AbstractOperations使用默认修饰符,也就是说同包下面才能访问,所以我们的包名必须跟它一致,org.springframework.data.redis.core

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package org.springframework.data.redis.core;

import lombok.AllArgsConstructor;
import lombok.Getter;

@Getter
@AllArgsConstructor
public enum BloomCommand {

RESERVE("BF.RESERVE"),
ADD("BF.ADD"),
MADD("BF.MADD"),
EXISTS("BF.EXISTS"),
MEXISTS("BF.MEXISTS");

private String command;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package org.springframework.data.redis.core;

public interface BloomOperations<K, V> {

/**
* 创建一个布隆过滤器
*
* @param: key
* @param: errorRate 错误比例
* @param: initCapacity 初始大小
*/
void createFilter(K key, double errorRate, long initCapacity);

/**
* 添加元素
*
* @param: key
* @param: value
* @return:
*/
Boolean add(K key, V value);

/**
* 批量添加
*
* @param: key
* @param: values
* @return:
*/
Boolean[] addMulti(K key, V... values);

/**
* 判断是否存在
*
* @param: key
* @param: value
* @return:
*/
Boolean exists(K key, V value);

/**
* 批量检查元素是否存在
*
* @param: key
* @param: values
*/
Boolean[] existsMulti(K key, V... values);

/**
* 删除布隆过滤器
*
* @param: key
* @return:
*/
Boolean delete(K key);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package org.springframework.data.redis.core;

import java.util.List;
import java.util.Objects;

class DefaultBloomOperations<K, V> extends AbstractOperations<K, V> implements BloomOperations<K, V> {

public DefaultBloomOperations(RedisTemplate<K, V> template) {
super(template);
}

@Override
public void createFilter(K key, double errorRate, long initCapacity) {
byte[] rawKey = rawKey(key);
byte[] rawErrorRate = rawString(String.valueOf(errorRate));
byte[] rawInitCapacity = rawString(String.valueOf(initCapacity));
this.execute(connection -> {
connection.execute(BloomCommand.RESERVE.getCommand(), rawKey, rawErrorRate, rawInitCapacity);
return null;
}, true);
}

@Override
public Boolean add(K key, V value) {
byte[] rawKey = rawKey(key);
byte[] rawValue = rawValue(value);
return this.execute(connection -> {
Long l = (Long) connection.execute(BloomCommand.ADD.getCommand(), rawKey, rawValue);
return Objects.equals(l, 1L);
}, true);
}


@Override
public Boolean[] addMulti(K key, V... values) {
byte[][] rawArgs = rawArgs(key, values);
return this.execute(connection -> {
List<Long> ls = (List<Long>) connection.execute(BloomCommand.MADD.getCommand(), rawArgs);
return ls.stream().map(l -> Objects.equals(l, 1L)).toArray(Boolean[]::new);
}, true);
}

@Override
public Boolean exists(K key, V value) {
byte[] rawKey = rawKey(key);
byte[] rawValue = rawValue(value);
return this.execute(connection -> {
Long l = (Long) connection.execute(BloomCommand.EXISTS.getCommand(), rawKey, rawValue);
return Objects.equals(l, 1L);
}, true);
}

@Override
public Boolean[] existsMulti(K key, V... values) {
byte[][] rawArgs = rawArgs(key, values);
return this.execute(connection -> {
List<Long> ls = (List<Long>) connection.execute(BloomCommand.MEXISTS.getCommand(), rawArgs);
return ls.stream().map(l -> Objects.equals(l, 1L)).toArray(Boolean[]::new);
}, true);
}

@Override
public Boolean delete(K key) {
return template.delete(key);
}

private byte[][] rawArgs(Object key, Object... values) {
byte[][] rawArgs = new byte[1 + values.length][];

int i = 0;
rawArgs[i++] = rawKey(key);

for (Object value : values) {
rawArgs[i++] = rawValue(value);
}

return rawArgs;
}
}

最后发现SpringBoot2使用的是lettuce,它没有对布隆过滤器的支持,所以改用Jedis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis-bloom-filter</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
</dependency>

最后开始测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
@SpringBootTest
public class BloomFilterTests {


@Autowired
BloomOperations bloomOperations;

@Test
public void test() {
//创建过滤器
bloomOperations.createFilter("bloom_filter", 0.1, 2000);
//添加元素
for (int i = 1; i < 200; i++) {
bloomOperations.add("bloom_filter", String.valueOf(i));
}
//批量添加
bloomOperations.addMulti("bloom_filter", "2000", "2001", "2003");
//是否存在
log.info("{}是否存在:{}", 2000, bloomOperations.exists("bloom_filter", "2000"));
log.info("{}是否存在:{}", 20005, bloomOperations.exists("bloom_filter", "20005"));
log.info("{}是否存在:{}", 20005 + "" + 2001, bloomOperations.existsMulti("bloom_filter", "20005", "2001"));
}
}

总结

布隆过滤器使用起来有点麻烦,不过效果还是很香的,它是一个逻辑存在的结构,在很多NoSQl数据库中都有应用。

以上内容个人学习总结,部分内容来源于网上,欢迎各位大佬指点。

代码连接地址:https://github.com/smalljop/my-blog-java-project

SpringBoot redis系列 -HyperLogLog (3)

背景

做了个活动H5页,产品希望能拿到该页面的大致访问人数,知道这个功能受欢迎程度,世面上虽然有很多成型的打点工具,接入也很方便,但是抱着自己学习下的想法,决定自己开发一下,最终选型在HyperLogLog,其实用set集合存储当天访问的Ip就可以,但是当用户量大,统计页多的话,带来的内存开销是很大的,虽然这些场景我都不存在,抱着学习的心态,亦然使用了HyperLogLog。

使用场景

  • 统计注册 IP 数
  • 网页UV数
  • 在线用户数

正文

HyperLogLog简介

  • HyperLogLog是一种算法,并非redis独有
  • 具有去重复特性
  • 做基数统计,不存储元素本身,无法拿到具体存储元素
  • 需要占用12k的存储空间(redis有优化,采用稀疏矩阵存储,只有当超过稀疏矩阵阈值时,才会变成稠密矩阵,占用12k内存),如果统计对象过多对空间的占用是不小的
  • 基数估计的结果是一个带有 0.81% 标准错误,不能保证数据绝对精准

springboot使用

封装操作工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.smalljop.redis.example.hyperloglog;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

/**
* @description:
* @author: smalljop
* @create: 2020-01-06 11:38
**/
@Component
@Slf4j
@AllArgsConstructor
public class HyperLogLogService {

private final StringRedisTemplate redisTemplate;


/**
* 添加值
* pfadd 指令
*
* @param key
* @param values
*/
public void add(String key, String... values) {
redisTemplate.opsForHyperLogLog().add(key, values);
}


/**
* 统计数量
* pfcount 指令
*
* @param key
*/
public Long count(String key) {
return redisTemplate.opsForHyperLogLog().size(key);
}


/**
* 合并 用于将多个 pf 计数值累加在一起形成一个新的 pf 值
* pfmerge
*
* @param keys
*/
public void merge(String key1, String... keys) {
redisTemplate.opsForHyperLogLog().union(key1, keys);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.smalljop.redis.example.hyperloglog;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.stereotype.Component;

import java.util.concurrent.ThreadLocalRandom;

import static org.junit.jupiter.api.Assertions.*;

@Slf4j
@SpringBootTest
class HyperLogLogServiceTest {

@Autowired
HyperLogLogService hyperLogLogService;

@Test
void test() {
String key = "HyperLogLog";
//处理化数组 模拟用户
String[] arr = new String[]{"1", "2", "3", "4", "5"};
// 循环一百次 模拟一百次访问
for (int i = 0; i < 1000; i++) {
int anInt = ThreadLocalRandom.current().nextInt(5);
hyperLogLogService.add(key, arr[anInt]);
}
//统计数量
Long count = hyperLogLogService.count(key);
log.info("统计数:{}", count);
}

}

总结

使用起来还是很简单的,算法比较复杂,没有深入研究,小规模没必要用,作为一个面试加分项吧

SpringBoot redis系列 -位图(2)

背景

需求

公司做的项目动感单车,有一个统计用户连续骑行天数的需求,如果把每天是否骑行状态放到数据库,那么每个用户一年就是365条记录,网上找了下有没有更好的解决方案,发现Redis中BitMap可以比较优雅的解决这个问题。

每天骑行状态占一个位(bit),一年也就46个字节而已,存储的空间大幅度省下,同时查找时间复杂度是O(1),运算速度也是比较快的。

使用场景

每日签到

日活跃用户

有多个布尔值的情况下就可以考虑用

9150e4e5ly1fkbxbrl7iej206y06w0sn.jpg

正文

BitMap(位图)

位图不是不是一种特殊的数据结构,他的内容就是一个普通字符串,用来存储的数据结构也就是string,我们可以使用get/set命令操作byte数组也可以使用redis提供的getbit/setbit命令把string当做位数组处理,通过位偏移量(元素下标)拿到值。

image-20200104163039028.png

BitMap跟java中的BitSet集合差不多,一串连续的二进制数字,0就标识false,1就标识true。我们把时间作为偏移量,然后根据下标拿到bit,然后就能得到该天是否骑行的状态(true/false)。大致如下:

image-20200104164440423.png

如第一个偏移量为0 就表示1月4号 ,以此类推

springboot中使用

我们就提出几个需求,然后围绕这个需求实现

  1. 获取用户当天是否骑行
  2. 用户第一次骑行时间
  3. 获取用户累计骑行天数

先封装一个操作位图的类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.smalljop.redis.example.bitmap;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

/**
* @description: 位图操作类
* @author: smalljop
* @create: 2020-01-04 13:17
**/
@Component
@Slf4j
@AllArgsConstructor
public class BitMapService {
private final StringRedisTemplate redisTemplate;


/**
* 保存
*
* @param key 位图key
* @param index 下标
* @param bit 0 false 1 true
* @return
*/
public Boolean setBit(String key, Long index, Boolean bit) {
return redisTemplate.execute((RedisCallback<Boolean>) con -> con.setBit(key.getBytes(), index, bit));
}

/**
* 获取值
*
* @param key 位图key
* @param index 下标
* @return
*/
public Boolean getBit(String key, Long index) {
return redisTemplate.execute((RedisCallback<Boolean>) con -> con.getBit(key.getBytes(), index));
}


/**
* 用来查找指定范围内出现的第一个 0 或 1
* @param key
* @return
*/
public Long bitPos(String key,Boolean bit){
return redisTemplate.execute((RedisCallback<Long>) con -> con.bitPos(key.getBytes(), bit));
}

/**
* 统计次数 只统计为为1的个数
*
* @param key 位图key
* @return
*/
public Long bitCount(String key) {
return redisTemplate.execute((RedisCallback<Long>) con -> con.bitCount(key.getBytes()));
}


/**
* 统计下标范围内次数 只统计为为1的个数
*
* @param key 位图key
* @return
*/
public Long bitCount(String key, Long start, Long end) {
return redisTemplate.execute((RedisCallback<Long>) con -> con.bitCount(key.getBytes(), start, end));
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.smalljop.redis.example.bitmap;

import com.smalljop.redis.example.queue.MessageProvider;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.event.annotation.AfterTestClass;
import org.springframework.test.context.event.annotation.BeforeTestClass;

import javax.annotation.PostConstruct;
import java.time.LocalDate;
import java.time.temporal.TemporalAdjusters;
import java.util.Random;

@Slf4j
@SpringBootTest
class BitMapServiceTest {


@Autowired
private BitMapService bitMapService;

//因为下标是从0开始的 所以我们需要约定一个0
// 或者你也可以不从0开始 直接以天数20200104这种作为下标 那之前的下标会用0补齐
// 同时会带来字节的浪费 但是使用方便
// 是从哪一天开始 比如从从开发的那天 我已 2020 01-01号为例
LocalDate localDate = LocalDate.of(2020, 01, 01);

// 算距离1970-01-01有多少天 以这天为起点 之后所有时间减去起点 如 18262 就是我的起点
Long epochDay = localDate.toEpochDay();


//我们为每个人维护一个bitmap 作为key
String key = "user:1:riding:days";

/**
* 初始化一批数据
*/
@Test
void init() {
//模拟从今天开始的之后一百天
Random random = new Random();
for (int i = 0; i < 100; i++) {
Long index = localDate.plusDays(i).toEpochDay() - epochDay;
//随机模拟 true和false
boolean aBoolean = random.nextBoolean();
bitMapService.setBit("user:1:riding:days", index, aBoolean);
log.info("初始化第{}条数据", i);
}
}


@Test
void setBit() {
bitMapService.setBit("testBitmap", 0L, true);
}

@Test
void getBit() {
bitMapService.getBit("testBitmap", 0L);
}

@Test
void test1() {
//保存今天已经骑行状态
long today = LocalDate.now().toEpochDay() - epochDay;
//查询是否骑行
Boolean bit = bitMapService.getBit(key, today);
log.info("今天是否骑行:{}", bit);
//保存骑行状态
bitMapService.setBit(key, today, true);
bit = bitMapService.getBit(key, today);
log.info("今天是否骑行:{}", bit);
}

@Test
void test2() {
LocalDate now = LocalDate.now();
long today = now.toEpochDay() - epochDay;
// 1. 获取用户当天是否骑行
Boolean bit = bitMapService.getBit(key, today);
log.info("今天是否骑行:{}", bit);
//获取当月天数 开始天数和结束天数
LocalDate startTime = now.with(TemporalAdjusters.firstDayOfMonth());
LocalDate endTime = now.with(TemporalAdjusters.lastDayOfMonth());
System.out.println((startTime.toEpochDay() - epochDay) + "----" + (endTime.toEpochDay() - epochDay));
// 3.用户第一次骑行时间
Long day = bitMapService.bitPos(key, true);
//减去时间差
LocalDate localDate = now.plusDays(today - day);
log.info("第一次骑行时间:{}", localDate);
// 4. 获取用户累计骑行天数
Long count = bitMapService.bitCount(key);
log.info("累计骑行天数:{}", count);
}

}

总结

使用BitMap需要考虑场景,以用户为维度还是以时间维度,像我目前就是以用户为维度,如果有个需求是统计今天有多少个用户,那我目前就很困难了,所以BitMap不是万灵药,但是如果刚好有固定的场景,使用BitMap确实是一个很好的解决方案,极大减少数据存储成本,扩容成本,以及运算成本。复杂场景使用起来有点难,最终我放弃了。。打扰了。

代码连接地址:https://github.com/smalljop/my-blog-java-project

稿定设计导出-20200104-184606.gif

SpringBoot redis系列 -延时队列(1)

背景

PS:笔者所在公司目前业务比较简单,相对项目架构也比较简单,暂时未有引入MQ等消息中间件,但是某天突然收到一个需求,需要在用户关注了我们的公众号之后,延迟几秒钟给用户在发送几条消息。最初考虑用要不DelayQueue或者定时线程池ScheduledThreadPoolExecutor走一波?感觉都不够优雅,刚好项目中有用到Redis,干脆就用Redis做个延时队列,也方便以后复用。当然,Redis实现的队列不是专业的MQ 对消息可靠性有高度要求的话,并不建议使用。比较简单的业务场景下还是可以用来异步延时解耦的。

9150e4e5gy1g9cxmqsitfj2073073wef.jpg

正文

延迟队列可以通过Zset(有序列表实现),Zset类似于java中SortedSet和HashMap的结合体,它是一个Set结构,保证了内部value值的唯一,同时他还可以给每个value设置一个score作为排序权重,Redis会根据score自动排序,我们每次拿到的就是最先需要被消费的消息,利用这个特性我们可以很好实现延迟队列。

1
2
3
4
5
6
7
8
9
10
11
12
spring:
application:
name: redis-example
redis:
host: localhost
port: 6379
# redis有16个库 默认选择第0个使用
database: 0
password:
# 端口给个0 代表随机选择一个未被使用端口
server:
port: 0

封装一个统一的Message类,方便统一管理所有延迟消息格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.smalljop.redis.example.queue;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.LocalDateTime;

/**
* @description: 消息统一封装类
* @author: smalljop
* @create: 2020-01-03 10:20
**/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Message {
/**
* 消息唯一标识
*/
private String id;
/**
* 消息渠道 如 订单 支付 代表不同业务类型
* 为消费时不同类去处理
*/
private String channel;
/**
* 具体消息 json
*/
private String body;

/**
* 延时时间 被消费时间 取当前时间戳+延迟时间
*/
private Long delayTime;

/**
* 创建时间
*/
private LocalDateTime createTime;
}

封装一个延时队列工具类 负责维护队列 提供常用操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.smalljop.redis.example.queue;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* @description: 延时队列功能类
* @author: smalljop
* @create: 2020-01-03 10:11
**/
@Component
@Slf4j
@AllArgsConstructor
public class DelayingQueueService {

private static ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().build();

private final StringRedisTemplate redisTemplate;

/**
* 可以不同业务用不同的key
*/
public static final String QUEUE_NAME = "message:queue";


/**
* 插入消息
*
* @param message
* @return
*/
@SneakyThrows
public Boolean push(Message message) {
Boolean addFlag = redisTemplate.opsForZSet().add(QUEUE_NAME, mapper.writeValueAsString(message), message.getDelayTime());
return addFlag;
}

/**
* 移除消息
*
* @param message
* @return
*/
@SneakyThrows
public Boolean remove(Message message) {
Long remove = redisTemplate.opsForZSet().remove(QUEUE_NAME, mapper.writeValueAsString(message));
return remove > 0 ? true : false;
}


/**
* 拉取最新需要
* 被消费的消息
* rangeByScore 根据score范围获取 0-当前时间戳可以拉取当前时间及以前的需要被消费的消息
*
* @return
*/
public List<Message> pull() {
Set<String> strings = redisTemplate.opsForZSet().rangeByScore(QUEUE_NAME, 0, System.currentTimeMillis());
if (strings == null) {
return null;
}
List<Message> msgList = strings.stream().map(msg -> {
Message message = null;
try {
message = mapper.readValue(msg, Message.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return message;
}).collect(Collectors.toList());
return msgList;
}


}

接下来写一个消息提供者,创建消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.smalljop.redis.example.queue;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.UUID;

/**
* @description: 消息提供者
* @author: smalljop
* @create: 2020-01-03 10:44
**/
@Component
@Slf4j
@AllArgsConstructor
public class MessageProvider {

private final DelayingQueueService delayingQueueService;

private static String USER_CHANNEL = "USER_CHANNEL";

/**
* 发送消息
*
* @param messageContent
*/
public void sendMessage(String messageContent, long delay) {
try {
if (messageContent != null) {
String seqId = UUID.randomUUID().toString();
Message message = new Message();
//时间戳默认为毫秒 延迟5s即为 5*1000
long time = System.currentTimeMillis();
LocalDateTime dateTime = Instant.ofEpochMilli(time).atZone(ZoneOffset.ofHours(8)).toLocalDateTime();
message.setDelayTime(time + (delay * 1000));
message.setCreateTime(dateTime);
message.setBody(messageContent);
message.setId(seqId);
message.setChannel(USER_CHANNEL);
delayingQueueService.push(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}

}

再创建一个消息消费者,定时轮训去拉取队列中的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.smalljop.redis.example.queue;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.List;
import java.util.Set;

/**
* @description: 延迟队列消费
* @author: smalljop
* @create: 2020-01-03 10:51
**/
@Component
@Slf4j
@AllArgsConstructor
public class MessageConsumer {
private static ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().build();
private final DelayingQueueService delayingQueueService;

/**
* 定时消费队列中的数据
* zset会对score进行排序 让最早消费的数据位于最前
* 拿最前的数据跟当前时间比较 时间到了则消费
*/
@Scheduled(cron = "*/1 * * * * *")
public void consumer() throws JsonProcessingException {
List<Message> msgList = delayingQueueService.pull();
if (null != msgList) {
long current = System.currentTimeMillis();
msgList.stream().forEach(msg -> {
// 已超时的消息拿出来消费
if (current >= msg.getDelayTime()) {
try {
log.info("消费消息:{}:消息创建时间:{},消费时间:{}", mapper.writeValueAsString(msg), msg.getCreateTime(), LocalDateTime.now());
} catch (JsonProcessingException e) {
e.printStackTrace();
}
//移除消息
delayingQueueService.remove(msg);
}
});
}
}


}

使用springboot的定时需要再启动类上加上开启定时注解

1
2
3
4
5
6
7
8
9
10
@SpringBootApplication
//打开定时
@EnableScheduling
public class RedisExampleApplication {

public static void main(String[] args) {
SpringApplication.run(RedisExampleApplication.class, args);
}

}

创建测试类,进行测试 延迟20s消费消息

1
2
3
4
5
6
7
8
9
10
11
12
@SpringBootTest
class MessageProviderTest {

@Autowired
private MessageProvider messageProvider;

@Test
void sendMessage() {
messageProvider.sendMessage("同时发送消息1", 20);
messageProvider.sendMessage("同时发送消息2", 20);
}
}

image-20200103115839751.png

1
2
2020-01-03 11:58:18.003  INFO 102328 --- [   scheduling-1] c.s.redis.example.queue.MessageConsumer  : 消费消息:{"id":"cf2cd12e-a8f0-4e04-b92f-511b033ba1bb","channel":"USER_CHANNEL","body":"同时发送消息1","delayTime":1578023897393,"createTime":[2020,1,3,11,57,57,393000000]}:消息创建时间:2020-01-03T11:57:57.393,消费时间:2020-01-03T11:58:18.003
2020-01-03 11:58:19.002 INFO 102328 --- [ scheduling-1] c.s.redis.example.queue.MessageConsumer : 消费消息:{"id":"7c220af3-9215-4cf0-82cb-a01063c2b5db","channel":"USER_CHANNEL","body":"同时发送消息2","delayTime":1578023898778,"createTime":[2020,1,3,11,57,58,778000000]}:消息创建时间:2020-01-03T11:57:58.778,消费时间:2020-01-03T11:58:19.002

小结

延时队列一个实现大约就是这样子了,redis毕竟不是专业的MQ,如果是比较严谨的场景建议还是不要使用redis实现队列。代码连接地址:https://github.com/smalljop/my-blog-java-project

稿定设计导出-20200103-120507.gif

新的一年,新的开始,继续砥砺前行,弄个博客记录自己工作和学习过程中的问题。希望自己能坚持写下去。