头条app

效果演示

image-20210709140539138

image-20210419152011931.png

image-20210426210402672.png

image-20210427014828830.png

技术栈

f3accd2ba01c41b0a9ac98370241eba3.png

  • Spring-Cloud-Gateway : 微服务之前架设的网关服务,实现服务注册中的API请求路由,以及控制流速控制和熔断处理都是常用的架构手段,而这些功能Gateway天然支持
  • 运用Spring Boot快速开发框架,构建项目工程;并结合Spring Cloud全家桶技术,实现后端个人中心、自媒体、管理中心等微服务。
  • 运用Spring Cloud Alibaba Nacos作为项目中的注册中心和配置中心
  • 运用mybatis-plus作为持久层提升开发效率
  • 运用Kafka完成内部系统消息通知;与客户端系统消息通知;以及实时数据计算
  • 运用Redis缓存技术,实现热数据的计算,提升系统性能指标
  • 使用Mysql存储用户数据,以保证上层数据查询的高性能
  • 使用Mongo存储用户热数据,以保证用户热数据高扩展和高性能指标
  • 使用FastDFS作为静态资源存储器,在其上实现热静态资源缓存、淘汰等功能
  • 运用Hbase技术,存储系统中的冷数据,保证系统数据的可靠性
  • 运用ES搜索技术,对冷数据、文章数据建立索引,以保证冷数据、文章查询性能

阶段一:app端文章查看,静态化freemarker,分布式文件系统minIO

文章表结构

ap_article文章表对应实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package com.heima.model.article.pojos;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.io.Serializable;
import java.util.Date;

/**
* <p>
* 文章信息表,存储已发布的文章
* </p>
*
* @author itheima
*/

@Data
@TableName("ap_article")
public class ApArticle implements Serializable {

@TableId(value = "id",type = IdType.ID_WORKER)
private Long id;


/**
* 标题
*/
private String title;

/**
* 作者id
*/
@TableField("author_id")
private Long authorId;

/**
* 作者名称
*/
@TableField("author_name")
private String authorName;

/**
* 频道id
*/
@TableField("channel_id")
private Integer channelId;

/**
* 频道名称
*/
@TableField("channel_name")
private String channelName;

/**
* 文章布局 0 无图文章 1 单图文章 2 多图文章
*/
private Short layout;

/**
* 文章标记 0 普通文章 1 热点文章 2 置顶文章 3 精品文章 4 大V 文章
*/
private Byte flag;

/**
* 文章封面图片 多张逗号分隔
*/
private String images;

/**
* 标签
*/
private String labels;

/**
* 点赞数量
*/
private Integer likes;

/**
* 收藏数量
*/
private Integer collection;

/**
* 评论数量
*/
private Integer comment;

/**
* 阅读数量
*/
private Integer views;

/**
* 省市
*/
@TableField("province_id")
private Integer provinceId;

/**
* 市区
*/
@TableField("city_id")
private Integer cityId;

/**
* 区县
*/
@TableField("county_id")
private Integer countyId;

/**
* 创建时间
*/
@TableField("created_time")
private Date createdTime;

/**
* 发布时间
*/
@TableField("publish_time")
private Date publishTime;

/**
* 同步状态
*/
@TableField("sync_status")
private Boolean syncStatus;

/**
* 来源
*/
private Boolean origin;

/**
* 静态页面地址
*/
@TableField("static_url")
private String staticUrl;
}

ap_article_config文章配置对应实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.heima.model.article.pojos;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.io.Serializable;

/**
* <p>
* APP已发布文章配置表
* </p>
*
* @author itheima
*/

@Data
@TableName("ap_article_config")
public class ApArticleConfig implements Serializable {

@TableId(value = "id",type = IdType.ID_WORKER)
private Long id;

/**
* 文章id
*/
@TableField("article_id")
private Long articleId;

/**
* 是否可评论
* true: 可以评论 1
* false: 不可评论 0
*/
@TableField("is_comment")
private Boolean isComment;

/**
* 是否转发
* true: 可以转发 1
* false: 不可转发 0
*/
@TableField("is_forward")
private Boolean isForward;

/**
* 是否下架
* true: 下架 1
* false: 没有下架 0
*/
@TableField("is_down")
private Boolean isDown;

/**
* 是否已删除
* true: 删除 1
* false: 没有删除 0
*/
@TableField("is_delete")
private Boolean isDelete;
}

ap_article_content 文章内容对应的实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.heima.model.article.pojos;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.io.Serializable;

@Data
@TableName("ap_article_content")
public class ApArticleContent implements Serializable {

@TableId(value = "id",type = IdType.ID_WORKER)
private Long id;

/**
* 文章id
*/
@TableField("article_id")
private Long articleId;

/**
* 文章内容
*/
private String content;
}

引入:freemarker

freemarker 介绍

​ FreeMarker 是一款 模板引擎: 即一种基于模板和要改变的数据, 并用来生成输出文本(HTML网页,电子邮件,配置文件,源代码等)的通用工具。 它不是面向最终用户的,而是一个Java类库,是一款程序员可以嵌入他们所开发产品的组件。

​ 模板编写为FreeMarker Template Language (FTL)。它是简单的,专用的语言, 不是 像PHP那样成熟的编程语言。 那就意味着要准备数据在真实编程语言中来显示,比如数据库查询和业务运算, 之后模板显示已经准备好的数据。在模板中,你可以专注于如何展现数据, 而在模板之外可以专注于要展示什么数据。

常用的java模板引擎还有哪些?

Jsp、Freemarker、Thymeleaf 、Velocity 等。

1.Jsp 为 Servlet 专用,不能单独进行使用。

2.Thymeleaf 为新技术,功能较为强大,但是执行的效率比较低。

3.Velocity从2010年更新完 2.0 版本后,便没有在更新。Spring Boot 官方在 1.4 版本后对此也不在支持,虽然 Velocity 在 2017 年版本得到迭代,但为时已晚。

文章详情

4.在文章微服务中导入依赖

1
2
3
4
5
6
7
8
9
10
11
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<dependency>
<groupId>com.heima</groupId>
<artifactId>heima-file-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>

5.新建ApArticleContentMapper

1
2
3
4
5
6
7
8
9
package com.heima.article.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heima.model.article.pojos.ApArticleContent;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface ApArticleContentMapper extends BaseMapper<ApArticleContent> {
}

6.在artile微服务中新增测试类(后期新增文章的时候创建详情静态页,目前暂时手动生成)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.heima.article.test;


import com.alibaba.fastjson.JSONArray;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.heima.article.ArticleApplication;
import com.heima.article.mapper.ApArticleContentMapper;
import com.heima.article.mapper.ApArticleMapper;
import com.heima.file.service.FileStorageService;
import com.heima.model.article.pojos.ApArticle;
import com.heima.model.article.pojos.ApArticleContent;
import freemarker.template.Configuration;
import freemarker.template.Template;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;

@SpringBootTest(classes = ArticleApplication.class)
@RunWith(SpringRunner.class)
public class ArticleFreemarkerTest {

@Autowired
private Configuration configuration;

@Autowired
private FileStorageService fileStorageService;


@Autowired
private ApArticleMapper apArticleMapper;

@Autowired
private ApArticleContentMapper apArticleContentMapper;

@Test
public void createStaticUrlTest() throws Exception {
//1.获取文章内容
ApArticleContent apArticleContent = apArticleContentMapper.selectOne(Wrappers.<ApArticleContent>lambdaQuery().eq(ApArticleContent::getArticleId, 1390536764510310401L));
if(apArticleContent != null && StringUtils.isNotBlank(apArticleContent.getContent())){
//2.文章内容通过freemarker生成html文件
StringWriter out = new StringWriter();
Template template = configuration.getTemplate("article.ftl");

Map<String, Object> params = new HashMap<>();
params.put("content", JSONArray.parseArray(apArticleContent.getContent()));

template.process(params, out);
InputStream is = new ByteArrayInputStream(out.toString().getBytes());

//3.把html文件上传到minio中
String path = fileStorageService.uploadHtmlFile("", apArticleContent.getArticleId() + ".html", is);

//4.修改ap_article表,保存static_url字段
ApArticle article = new ApArticle();
article.setId(apArticleContent.getArticleId());
article.setStaticUrl(path);
apArticleMapper.updateById(article);

}
}
}

阶段二:自媒体文章发布

自媒体素材管理

对应实体类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.heima.model.wemedia.pojos;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.io.Serializable;
import java.util.Date;

/**
* <p>
* 自媒体图文素材信息表
* </p>
*
* @author itheima
*/
@Data
@TableName("wm_material")
public class WmMaterial implements Serializable {

private static final long serialVersionUID = 1L;

/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;

/**
* 自媒体用户ID
*/
@TableField("user_id")
private Integer userId;

/**
* 图片地址
*/
@TableField("url")
private String url;

/**
* 素材类型
0 图片
1 视频
*/
@TableField("type")
private Short type;

/**
* 是否收藏
*/
@TableField("is_collection")
private Short isCollection;

/**
* 创建时间
*/
@TableField("created_time")
private Date createdTime;

}

①:创建WmMaterialController

1
2
3
4
5
6
7
8
9
10
11
@RestController
@RequestMapping("/api/v1/material")
public class WmMaterialController {


@PostMapping("/upload_picture")
public ResponseResult uploadPicture(MultipartFile multipartFile){
return null;
}

}

②:mapper

1
2
3
4
5
6
7
8
9
package com.heima.wemedia.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heima.model.wemedia.pojos.WmMaterial;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface WmMaterialMapper extends BaseMapper<WmMaterial> {
}

③:业务层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.heima.wemedia.service;

public interface WmMaterialService extends IService<WmMaterial> {

/**
* 图片上传
* @param multipartFile
* @return
*/
public ResponseResult uploadPicture(MultipartFile multipartFile);



}

业务层实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.heima.wemedia.service.impl;


import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.heima.file.service.FileStorageService;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.wemedia.pojos.WmMaterial;
import com.heima.utils.thread.WmThreadLocalUtil;
import com.heima.wemedia.mapper.WmMaterialMapper;
import com.heima.wemedia.service.WmMaterialService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;

import java.io.IOException;
import java.util.Date;
import java.util.UUID;

@Slf4j
@Service
@Transactional
public class WmMaterialServiceImpl extends ServiceImpl<WmMaterialMapper, WmMaterial> implements WmMaterialService {

@Autowired
private FileStorageService fileStorageService;


/**
* 图片上传
* @param multipartFile
* @return
*/
@Override
public ResponseResult uploadPicture(MultipartFile multipartFile) {

//1.检查参数
if(multipartFile == null || multipartFile.getSize() == 0){
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}

//2.上传图片到minIO中
String fileName = UUID.randomUUID().toString().replace("-", "");
//aa.jpg
String originalFilename = multipartFile.getOriginalFilename();
String postfix = originalFilename.substring(originalFilename.lastIndexOf("."));
String fileId = null;
try {
fileId = fileStorageService.uploadImgFile("", fileName + postfix, multipartFile.getInputStream());
log.info("上传图片到MinIO中,fileId:{}",fileId);
} catch (IOException e) {
e.printStackTrace();
log.error("WmMaterialServiceImpl-上传文件失败");
}

//3.保存到数据库中
WmMaterial wmMaterial = new WmMaterial();
wmMaterial.setUserId(WmThreadLocalUtil.getUser().getId());
wmMaterial.setUrl(fileId);
wmMaterial.setIsCollection((short)0);
wmMaterial.setType((short)0);
wmMaterial.setCreatedTime(new Date());
save(wmMaterial);

//4.返回结果

return ResponseResult.okResult(wmMaterial);
}

}

④:控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RestController
@RequestMapping("/api/v1/material")
public class WmMaterialController {

@Autowired
private WmMaterialService wmMaterialService;


@PostMapping("/upload_picture")
public ResponseResult uploadPicture(MultipartFile multipartFile){
return wmMaterialService.uploadPicture(multipartFile);
}

}

①:在WmMaterialController类中新增方法

1
2
3
4
@PostMapping("/list")
public ResponseResult findList(@RequestBody WmMaterialDto dto){
return null;
}

②:mapper已定义

③:业务层

在WmMaterialService中新增方法

1
2
3
4
5
6
/**
* 素材列表查询
* @param dto
* @return
*/
public ResponseResult findList( WmMaterialDto dto);

实现方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 素材列表查询
* @param dto
* @return
*/
@Override
public ResponseResult findList(WmMaterialDto dto) {

//1.检查参数
dto.checkParam();

//2.分页查询
IPage page = new Page(dto.getPage(),dto.getSize());
LambdaQueryWrapper<WmMaterial> lambdaQueryWrapper = new LambdaQueryWrapper<>();
//是否收藏
if(dto.getIsCollection() != null && dto.getIsCollection() == 1){
lambdaQueryWrapper.eq(WmMaterial::getIsCollection,dto.getIsCollection());
}

//按照用户查询
lambdaQueryWrapper.eq(WmMaterial::getUserId,WmThreadLocalUtil.getUser().getId());

//按照时间倒序
lambdaQueryWrapper.orderByDesc(WmMaterial::getCreatedTime);


page = page(page,lambdaQueryWrapper);

//3.结果返回
ResponseResult responseResult = new PageResponseResult(dto.getPage(),dto.getSize(),(int)page.getTotal());
responseResult.setData(page.getRecords());
return responseResult;
}

④:控制器:

1
2
3
4
@PostMapping("/list")
public ResponseResult findList(@RequestBody WmMaterialDto dto){
return wmMaterialService.findList(dto);
}

⑤:在自媒体引导类中天mybatis-plus的分页拦截器

1
2
3
4
5
6
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
return interceptor;
}

自媒体文章管理

查询所有频道

接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.heima.wemedia.controller.v1;

import com.heima.model.common.dtos.ResponseResult;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/v1/channel")
public class WmchannelController {


@GetMapping("/channels")
public ResponseResult findAll(){
return null;
}
}

mapper

1
2
3
4
5
6
7
8
9
package com.heima.wemedia.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heima.model.wemedia.pojos.WmChannel;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface WmChannelMapper extends BaseMapper<WmChannel> {
}

service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.heima.wemedia.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.wemedia.pojos.WmChannel;

public interface WmChannelService extends IService<WmChannel> {

/**
* 查询所有频道
* @return
*/
public ResponseResult findAll();


}

实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.heima.wemedia.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.wemedia.pojos.WmChannel;
import com.heima.wemedia.mapper.WmChannelMapper;
import com.heima.wemedia.service.WmChannelService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Transactional
@Slf4j
public class WmChannelServiceImpl extends ServiceImpl<WmChannelMapper, WmChannel> implements WmChannelService {


/**
* 查询所有频道
* @return
*/
@Override
public ResponseResult findAll() {
return ResponseResult.okResult(list());
}
}

控制层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.heima.wemedia.controller.v1;

import com.heima.model.common.dtos.ResponseResult;
import com.heima.wemedia.service.WmChannelService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/v1/channel")
public class WmchannelController {

@Autowired
private WmChannelService wmChannelService;

@GetMapping("/channels")
public ResponseResult findAll(){
return wmChannelService.findAll();
}
}

查询自媒体文章

①:新增WmNewsController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.heima.wemedia.controller.v1;

import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.wemedia.dtos.WmNewsPageReqDto;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/v1/news")
public class WmNewsController {


@PostMapping("/list")
public ResponseResult findAll(@RequestBody WmNewsPageReqDto dto){
return null;
}

}

②:新增WmNewsMapper

1
2
3
4
5
6
7
8
9
10
package com.heima.wemedia.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heima.model.wemedia.pojos.WmNews;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface WmNewsMapper extends BaseMapper<WmNews> {

}

③:新增WmNewsService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.heima.wemedia.service;


import com.baomidou.mybatisplus.extension.service.IService;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.wemedia.dtos.WmNewsPageReqDto;
import com.heima.model.wemedia.pojos.WmNews;

public interface WmNewsService extends IService<WmNews> {

/**
* 查询文章
* @param dto
* @return
*/
public ResponseResult findAll(WmNewsPageReqDto dto);

}

实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package com.heima.wemedia.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.heima.model.common.dtos.PageResponseResult;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.wemedia.dtos.WmNewsPageReqDto;
import com.heima.model.wemedia.pojos.WmNews;
import com.heima.model.wemedia.pojos.WmUser;
import com.heima.utils.thread.WmThreadLocalUtil;
import com.heima.wemedia.mapper.WmNewsMapper;
import com.heima.wemedia.service.WmNewsService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Slf4j
@Transactional
public class WmNewsServiceImpl extends ServiceImpl<WmNewsMapper, WmNews> implements WmNewsService {

/**
* 查询文章
* @param dto
* @return
*/
@Override
public ResponseResult findAll(WmNewsPageReqDto dto) {

//1.检查参数
if(dto == null){
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
//分页参数检查
dto.checkParam();
//获取当前登录人的信息
WmUser user = WmThreadLocalUtil.getUser();
if(user == null){
return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
}

//2.分页条件查询
IPage page = new Page(dto.getPage(),dto.getSize());
LambdaQueryWrapper<WmNews> lambdaQueryWrapper = new LambdaQueryWrapper<>();
//状态精确查询
if(dto.getStatus() != null){
lambdaQueryWrapper.eq(WmNews::getStatus,dto.getStatus());
}

//频道精确查询
if(dto.getChannelId() != null){
lambdaQueryWrapper.eq(WmNews::getChannelId,dto.getChannelId());
}

//时间范围查询
if(dto.getBeginPubDate()!=null && dto.getEndPubDate()!=null){
lambdaQueryWrapper.between(WmNews::getPublishTime,dto.getBeginPubDate(),dto.getEndPubDate());
}

//关键字模糊查询
if(StringUtils.isNotBlank(dto.getKeyword())){
lambdaQueryWrapper.like(WmNews::getTitle,dto.getKeyword());
}

//查询当前登录用户的文章
lambdaQueryWrapper.eq(WmNews::getUserId,user.getId());

//发布时间倒序查询
lambdaQueryWrapper.orderByDesc(WmNews::getCreatedTime);

page = page(page,lambdaQueryWrapper);

//3.结果返回
ResponseResult responseResult = new PageResponseResult(dto.getPage(),dto.getSize(),(int)page.getTotal());
responseResult.setData(page.getRecords());

return responseResult;
}

}

④:控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.heima.wemedia.controller.v1;

import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.wemedia.dtos.WmNewsPageReqDto;
import com.heima.wemedia.service.WmNewsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/v1/news")
public class WmNewsController {


@Autowired
private WmNewsService wmNewsService;

@PostMapping("/list")
public ResponseResult findAll(@RequestBody WmNewsPageReqDto dto){
return wmNewsService.findAll(dto);
}

}

文章发布

①:在新增WmNewsController中新增方法

1
2
3
4
@PostMapping("/submit")
public ResponseResult submitNews(@RequestBody WmNewsDto dto){
return null;
}

②:新增WmNewsMaterialMapper类,文章与素材的关联关系需要批量保存,索引需要定义mapper文件和对应的映射文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.heima.wemedia.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heima.model.wemedia.pojos.WmNewsMaterial;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

import java.util.List;

@Mapper
public interface WmNewsMaterialMapper extends BaseMapper<WmNewsMaterial> {

void saveRelations(@Param("materialIds") List<Integer> materialIds,@Param("newsId") Integer newsId, @Param("type")Short type);
}

WmNewsMaterialMapper.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.heima.wemedia.mapper.WmNewsMaterialMapper">

<insert id="saveRelations">
insert into wm_news_material (material_id,news_id,type,ord)
values
<foreach collection="materialIds" index="ord" item="mid" separator=",">
(#{mid},#{newsId},#{type},#{ord})
</foreach>
</insert>

</mapper>

③:常量类准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.heima.common.constants;

public class WemediaConstants {

public static final Short COLLECT_MATERIAL = 1;//收藏

public static final Short CANCEL_COLLECT_MATERIAL = 0;//取消收藏

public static final String WM_NEWS_TYPE_IMAGE = "image";

public static final Short WM_NEWS_NONE_IMAGE = 0;
public static final Short WM_NEWS_SINGLE_IMAGE = 1;
public static final Short WM_NEWS_MANY_IMAGE = 3;
public static final Short WM_NEWS_TYPE_AUTO = -1;

public static final Short WM_CONTENT_REFERENCE = 0;
public static final Short WM_COVER_REFERENCE = 1;
}

④:在WmNewsService中新增方法

1
2
3
4
5
6
/**
* 发布文章或保存草稿
* @param dto
* @return
*/
public ResponseResult submitNews(WmNewsDto dto);

实现方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
/**
* 发布修改文章或保存为草稿
* @param dto
* @return
*/
@Override
public ResponseResult submitNews(WmNewsDto dto) {

//0.条件判断
if(dto == null || dto.getContent() == null){
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}

//1.保存或修改文章

WmNews wmNews = new WmNews();
//属性拷贝 属性名词和类型相同才能拷贝
BeanUtils.copyProperties(dto,wmNews);
//封面图片 list---> string
if(dto.getImages() != null && dto.getImages().size() > 0){
//[1dddfsd.jpg,sdlfjldk.jpg]--> 1dddfsd.jpg,sdlfjldk.jpg
String imageStr = StringUtils.join(dto.getImages(), ",");
wmNews.setImages(imageStr);
}
//如果当前封面类型为自动 -1
if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
wmNews.setType(null);
}

saveOrUpdateWmNews(wmNews);

//2.判断是否为草稿 如果为草稿结束当前方法
if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

//3.不是草稿,保存文章内容图片与素材的关系
//获取到文章内容中的图片信息
List<String> materials = ectractUrlInfo(dto.getContent());
saveRelativeInfoForContent(materials,wmNews.getId());

//4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片
saveRelativeInfoForCover(dto,wmNews,materials);

return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);

}

/**
* 第一个功能:如果当前封面类型为自动,则设置封面类型的数据
* 匹配规则:
* 1,如果内容图片大于等于1,小于3 单图 type 1
* 2,如果内容图片大于等于3 多图 type 3
* 3,如果内容没有图片,无图 type 0
*
* 第二个功能:保存封面图片与素材的关系
* @param dto
* @param wmNews
* @param materials
*/
private void saveRelativeInfoForCover(WmNewsDto dto, WmNews wmNews, List<String> materials) {

List<String> images = dto.getImages();

//如果当前封面类型为自动,则设置封面类型的数据
if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
//多图
if(materials.size() >= 3){
wmNews.setType(WemediaConstants.WM_NEWS_MANY_IMAGE);
images = materials.stream().limit(3).collect(Collectors.toList());
}else if(materials.size() >= 1 && materials.size() < 3){
//单图
wmNews.setType(WemediaConstants.WM_NEWS_SINGLE_IMAGE);
images = materials.stream().limit(1).collect(Collectors.toList());
}else {
//无图
wmNews.setType(WemediaConstants.WM_NEWS_NONE_IMAGE);
}

//修改文章
if(images != null && images.size() > 0){
wmNews.setImages(StringUtils.join(images,","));
}
updateById(wmNews);
}
if(images != null && images.size() > 0){
saveRelativeInfo(images,wmNews.getId(),WemediaConstants.WM_COVER_REFERENCE);
}

}


/**
* 处理文章内容图片与素材的关系
* @param materials
* @param newsId
*/
private void saveRelativeInfoForContent(List<String> materials, Integer newsId) {
saveRelativeInfo(materials,newsId,WemediaConstants.WM_CONTENT_REFERENCE);
}

@Autowired
private WmMaterialMapper wmMaterialMapper;

/**
* 保存文章图片与素材的关系到数据库中
* @param materials
* @param newsId
* @param type
*/
private void saveRelativeInfo(List<String> materials, Integer newsId, Short type) {
if(materials!=null && !materials.isEmpty()){
//通过图片的url查询素材的id
List<WmMaterial> dbMaterials = wmMaterialMapper.selectList(Wrappers.<WmMaterial>lambdaQuery().in(WmMaterial::getUrl, materials));

//判断素材是否有效
if(dbMaterials==null || dbMaterials.size() == 0){
//手动抛出异常 第一个功能:能够提示调用者素材失效了,第二个功能,进行数据的回滚
throw new CustomException(AppHttpCodeEnum.MATERIASL_REFERENCE_FAIL);
}

if(materials.size() != dbMaterials.size()){
throw new CustomException(AppHttpCodeEnum.MATERIASL_REFERENCE_FAIL);
}

List<Integer> idList = dbMaterials.stream().map(WmMaterial::getId).collect(Collectors.toList());

//批量保存
wmNewsMaterialMapper.saveRelations(idList,newsId,type);
}

}


/**
* 提取文章内容中的图片信息
* @param content
* @return
*/
private List<String> ectractUrlInfo(String content) {
List<String> materials = new ArrayList<>();

List<Map> maps = JSON.parseArray(content, Map.class);
for (Map map : maps) {
if(map.get("type").equals("image")){
String imgUrl = (String) map.get("value");
materials.add(imgUrl);
}
}

return materials;
}

@Autowired
private WmNewsMaterialMapper wmNewsMaterialMapper;

/**
* 保存或修改文章
* @param wmNews
*/
private void saveOrUpdateWmNews(WmNews wmNews) {
//补全属性
wmNews.setUserId(WmThreadLocalUtil.getUser().getId());
wmNews.setCreatedTime(new Date());
wmNews.setSubmitedTime(new Date());
wmNews.setEnable((short)1);//默认上架

if(wmNews.getId() == null){
//保存
save(wmNews);
}else {
//修改
//删除文章图片与素材的关系
wmNewsMaterialMapper.delete(Wrappers.<WmNewsMaterial>lambdaQuery().eq(WmNewsMaterial::getNewsId,wmNews.getId()));
updateById(wmNews);
}

}

④:控制器

1
2
3
4
@PostMapping("/submit")
public ResponseResult submitNews(@RequestBody WmNewsDto dto){
return wmNewsService.submitNews(dto);
}

阶段三:自媒体文章-自动审核

分布式id

随着业务的增长,文章表可能要占用很大的物理存储空间,为了解决该问题,后期使用数据库分片技术。将一个数据库进行拆分,通过数据库中间件连接。如果数据库中该表选用ID自增策略,则可能产生重复的ID,此时应该使用分布式ID生成策略来生成ID。

image-20210505005448995

snowflake是Twitter开源的分布式ID生成算法,结果是一个long型的ID。其核心思想是:使用41bit作为毫秒数,10bit作为机器的ID(5个bit是数据中心,5个bit的机器ID),12bit作为毫秒内的流水号(意味着每个节点在每毫秒可以产生 4096 个 ID),最后还有一个符号位,永远是0

image-20210505005509258

文章端相关的表都使用雪花算法生成id,包括ap_article、 ap_article_config、 ap_article_content

mybatis-plus已经集成了雪花算法,完成以下两步即可在项目中集成雪花算法

第一:在实体类中的id上加入如下配置,指定类型为id_worker

1
2
@TableId(value = "id",type = IdType.ID_WORKER)
private Long id;

第二:在application.yml文件中配置数据中心id和机器id

1
2
3
4
5
6
7
mybatis-plus:
mapper-locations: classpath*:mapper/*.xml
# 设置别名包扫描路径,通过该属性可以给包中的类注册别名
type-aliases-package: com.heima.model.article.pojos
global-config:
datacenter-id: 1
workerId: 1

datacenter-id:数据中心id(取值范围:0-31)

workerId:机器id(取值范围:0-31)

自媒体文章自动审核功能实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
package com.heima.wemedia.service.impl;

import com.alibaba.fastjson.JSONArray;
import com.heima.apis.article.IArticleClient;
import com.heima.common.aliyun.GreenImageScan;
import com.heima.common.aliyun.GreenTextScan;
import com.heima.file.service.FileStorageService;
import com.heima.model.article.dtos.ArticleDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.wemedia.pojos.WmChannel;
import com.heima.model.wemedia.pojos.WmNews;
import com.heima.model.wemedia.pojos.WmUser;
import com.heima.wemedia.mapper.WmChannelMapper;
import com.heima.wemedia.mapper.WmNewsMapper;
import com.heima.wemedia.mapper.WmUserMapper;
import com.heima.wemedia.service.WmNewsAutoScanService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.*;
import java.util.stream.Collectors;


@Service
@Slf4j
@Transactional
public class WmNewsAutoScanServiceImpl implements WmNewsAutoScanService {

@Autowired
private WmNewsMapper wmNewsMapper;

/**
* 自媒体文章审核
*
* @param id 自媒体文章id
*/
@Override
public void autoScanWmNews(Integer id) {
//1.查询自媒体文章
WmNews wmNews = wmNewsMapper.selectById(id);
if(wmNews == null){
throw new RuntimeException("WmNewsAutoScanServiceImpl-文章不存在");
}

if(wmNews.getStatus().equals(WmNews.Status.SUBMIT.getCode())){
//从内容中提取纯文本内容和图片
Map<String,Object> textAndImages = handleTextAndImages(wmNews);

//2.审核文本内容 阿里云接口
boolean isTextScan = handleTextScan((String) textAndImages.get("content"),wmNews);
if(!isTextScan)return;

//3.审核图片 阿里云接口
boolean isImageScan = handleImageScan((List<String>) textAndImages.get("images"),wmNews);
if(!isImageScan)return;

//4.审核成功,保存app端的相关的文章数据
ResponseResult responseResult = saveAppArticle(wmNews);
if(!responseResult.getCode().equals(200)){
throw new RuntimeException("WmNewsAutoScanServiceImpl-文章审核,保存app端相关文章数据失败");
}
//回填article_id
wmNews.setArticleId((Long) responseResult.getData());
updateWmNews(wmNews,(short) 9,"审核成功");

}
}

@Autowired
private IArticleClient articleClient;

@Autowired
private WmChannelMapper wmChannelMapper;

@Autowired
private WmUserMapper wmUserMapper;

/**
* 保存app端相关的文章数据
* @param wmNews
*/
private ResponseResult saveAppArticle(WmNews wmNews) {

ArticleDto dto = new ArticleDto();
//属性的拷贝
BeanUtils.copyProperties(wmNews,dto);
//文章的布局
dto.setLayout(wmNews.getType());
//频道
WmChannel wmChannel = wmChannelMapper.selectById(wmNews.getChannelId());
if(wmChannel != null){
dto.setChannelName(wmChannel.getName());
}

//作者
dto.setAuthorId(wmNews.getUserId().longValue());
WmUser wmUser = wmUserMapper.selectById(wmNews.getUserId());
if(wmUser != null){
dto.setAuthorName(wmUser.getName());
}

//设置文章id
if(wmNews.getArticleId() != null){
dto.setId(wmNews.getArticleId());
}
dto.setCreatedTime(new Date());

ResponseResult responseResult = articleClient.saveArticle(dto);
return responseResult;

}


@Autowired
private FileStorageService fileStorageService;

@Autowired
private GreenImageScan greenImageScan;

/**
* 审核图片
* @param images
* @param wmNews
* @return
*/
private boolean handleImageScan(List<String> images, WmNews wmNews) {

boolean flag = true;

if(images == null || images.size() == 0){
return flag;
}

//下载图片 minIO
//图片去重
images = images.stream().distinct().collect(Collectors.toList());

List<byte[]> imageList = new ArrayList<>();

for (String image : images) {
byte[] bytes = fileStorageService.downLoadFile(image);
imageList.add(bytes);
}


//审核图片
try {
Map map = greenImageScan.imageScan(imageList);
if(map != null){
//审核失败
if(map.get("suggestion").equals("block")){
flag = false;
updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
}

//不确定信息 需要人工审核
if(map.get("suggestion").equals("review")){
flag = false;
updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
}
}

} catch (Exception e) {
flag = false;
e.printStackTrace();
}
return flag;
}

@Autowired
private GreenTextScan greenTextScan;

/**
* 审核纯文本内容
* @param content
* @param wmNews
* @return
*/
private boolean handleTextScan(String content, WmNews wmNews) {

boolean flag = true;

if((wmNews.getTitle()+"-"+content).length() == 0){
return flag;
}

try {
Map map = greenTextScan.greeTextScan((wmNews.getTitle()+"-"+content));
if(map != null){
//审核失败
if(map.get("suggestion").equals("block")){
flag = false;
updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
}

//不确定信息 需要人工审核
if(map.get("suggestion").equals("review")){
flag = false;
updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
}
}
} catch (Exception e) {
flag = false;
e.printStackTrace();
}

return flag;

}

/**
* 修改文章内容
* @param wmNews
* @param status
* @param reason
*/
private void updateWmNews(WmNews wmNews, short status, String reason) {
wmNews.setStatus(status);
wmNews.setReason(reason);
wmNewsMapper.updateById(wmNews);
}

/**
* 1。从自媒体文章的内容中提取文本和图片
* 2.提取文章的封面图片
* @param wmNews
* @return
*/
private Map<String, Object> handleTextAndImages(WmNews wmNews) {

//存储纯文本内容
StringBuilder stringBuilder = new StringBuilder();

List<String> images = new ArrayList<>();

//1。从自媒体文章的内容中提取文本和图片
if(StringUtils.isNotBlank(wmNews.getContent())){
List<Map> maps = JSONArray.parseArray(wmNews.getContent(), Map.class);
for (Map map : maps) {
if (map.get("type").equals("text")){
stringBuilder.append(map.get("value"));
}

if (map.get("type").equals("image")){
images.add((String) map.get("value"));
}
}
}
//2.提取文章的封面图片
if(StringUtils.isNotBlank(wmNews.getImages())){
String[] split = wmNews.getImages().split(",");
images.addAll(Arrays.asList(split));
}

Map<String, Object> resultMap = new HashMap<>();
resultMap.put("content",stringBuilder.toString());
resultMap.put("images",images);
return resultMap;

}
}

新需求-自管理敏感词

DFA实现原理

DFA全称为:Deterministic Finite Automaton,即确定有穷自动机。

存储:一次性的把所有的敏感词存储到了多个map中,就是下图表示这种结构

敏感词:冰毒、大麻、大坏蛋

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.heima.model.wemedia.pojos;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.io.Serializable;
import java.util.Date;

/**
* <p>
* 敏感词信息表
* </p>
*
* @author itheima
*/
@Data
@TableName("wm_sensitive")
public class WmSensitive implements Serializable {

private static final long serialVersionUID = 1L;

/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;

/**
* 敏感词
*/
@TableField("sensitives")
private String sensitives;

/**
* 创建时间
*/
@TableField("created_time")
private Date createdTime;

}

②:拷贝对应的wm_sensitive的mapper到项目中

1
2
3
4
5
6
7
8
9
10
package com.heima.wemedia.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heima.model.wemedia.pojos.WmSensitive;
import org.apache.ibatis.annotations.Mapper;


@Mapper
public interface WmSensitiveMapper extends BaseMapper<WmSensitive> {
}

③:在文章审核的代码中添加自管理敏感词审核

第一:在WmNewsAutoScanServiceImpl中的autoScanWmNews方法上添加如下代码

1
2
3
4
5
6
7
8
9
//从内容中提取纯文本内容和图片
//.....省略

//自管理的敏感词过滤
boolean isSensitive = handleSensitiveScan((String) textAndImages.get("content"), wmNews);
if(!isSensitive) return;

//2.审核文本内容 阿里云接口
//.....省略

新增自管理敏感词审核代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Autowired
private WmSensitiveMapper wmSensitiveMapper;

/**
* 自管理的敏感词审核
* @param content
* @param wmNews
* @return
*/
private boolean handleSensitiveScan(String content, WmNews wmNews) {

boolean flag = true;

//获取所有的敏感词
List<WmSensitive> wmSensitives = wmSensitiveMapper.selectList(Wrappers.<WmSensitive>lambdaQuery().select(WmSensitive::getSensitives));
List<String> sensitiveList = wmSensitives.stream().map(WmSensitive::getSensitives).collect(Collectors.toList());

//初始化敏感词库
SensitiveWordUtil.initMap(sensitiveList);

//查看文章中是否包含敏感词
Map<String, Integer> map = SensitiveWordUtil.matchWords(content);
if(map.size() >0){
updateWmNews(wmNews,(short) 2,"当前文章中存在违规内容"+map);
flag = false;
}

return flag;
}

最后附上文章审核的完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
package com.heima.wemedia.service.impl;

import com.alibaba.fastjson.JSONArray;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.heima.apis.article.IArticleClient;
import com.heima.common.aliyun.GreenImageScan;
import com.heima.common.aliyun.GreenTextScan;
import com.heima.common.tess4j.Tess4jClient;
import com.heima.file.service.FileStorageService;
import com.heima.model.article.dtos.ArticleDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.wemedia.pojos.WmChannel;
import com.heima.model.wemedia.pojos.WmNews;
import com.heima.model.wemedia.pojos.WmSensitive;
import com.heima.model.wemedia.pojos.WmUser;
import com.heima.utils.common.SensitiveWordUtil;
import com.heima.wemedia.mapper.WmChannelMapper;
import com.heima.wemedia.mapper.WmNewsMapper;
import com.heima.wemedia.mapper.WmSensitiveMapper;
import com.heima.wemedia.mapper.WmUserMapper;
import com.heima.wemedia.service.WmNewsAutoScanService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.util.*;
import java.util.stream.Collectors;


@Service
@Slf4j
@Transactional
public class WmNewsAutoScanServiceImpl implements WmNewsAutoScanService {

@Autowired
private WmNewsMapper wmNewsMapper;

/**
* 自媒体文章审核
*
* @param id 自媒体文章id
*/
@Override
@Async //标明当前方法是一个异步方法
public void autoScanWmNews(Integer id) {

// int a = 1/0;

//1.查询自媒体文章
WmNews wmNews = wmNewsMapper.selectById(id);
if (wmNews == null) {
throw new RuntimeException("WmNewsAutoScanServiceImpl-文章不存在");
}

if (wmNews.getStatus().equals(WmNews.Status.SUBMIT.getCode())) {
//从内容中提取纯文本内容和图片
Map<String, Object> textAndImages = handleTextAndImages(wmNews);

//自管理的敏感词过滤
boolean isSensitive = handleSensitiveScan((String) textAndImages.get("content"), wmNews);
if(!isSensitive) return;

//2.审核文本内容 阿里云接口
boolean isTextScan = handleTextScan((String) textAndImages.get("content"), wmNews);
if (!isTextScan) return;

//3.审核图片 阿里云接口
boolean isImageScan = handleImageScan((List<String>) textAndImages.get("images"), wmNews);
if (!isImageScan) return;

//4.审核成功,保存app端的相关的文章数据
ResponseResult responseResult = saveAppArticle(wmNews);
if (!responseResult.getCode().equals(200)) {
throw new RuntimeException("WmNewsAutoScanServiceImpl-文章审核,保存app端相关文章数据失败");
}
//回填article_id
wmNews.setArticleId((Long) responseResult.getData());
updateWmNews(wmNews, (short) 9, "审核成功");

}
}

@Autowired
private WmSensitiveMapper wmSensitiveMapper;

/**
* 自管理的敏感词审核
* @param content
* @param wmNews
* @return
*/
private boolean handleSensitiveScan(String content, WmNews wmNews) {

boolean flag = true;

//获取所有的敏感词
List<WmSensitive> wmSensitives = wmSensitiveMapper.selectList(Wrappers.<WmSensitive>lambdaQuery().select(WmSensitive::getSensitives));
List<String> sensitiveList = wmSensitives.stream().map(WmSensitive::getSensitives).collect(Collectors.toList());

//初始化敏感词库
SensitiveWordUtil.initMap(sensitiveList);

//查看文章中是否包含敏感词
Map<String, Integer> map = SensitiveWordUtil.matchWords(content);
if(map.size() >0){
updateWmNews(wmNews,(short) 2,"当前文章中存在违规内容"+map);
flag = false;
}

return flag;
}

@Autowired
private IArticleClient articleClient;

@Autowired
private WmChannelMapper wmChannelMapper;

@Autowired
private WmUserMapper wmUserMapper;

/**
* 保存app端相关的文章数据
*
* @param wmNews
*/
private ResponseResult saveAppArticle(WmNews wmNews) {

ArticleDto dto = new ArticleDto();
//属性的拷贝
BeanUtils.copyProperties(wmNews, dto);
//文章的布局
dto.setLayout(wmNews.getType());
//频道
WmChannel wmChannel = wmChannelMapper.selectById(wmNews.getChannelId());
if (wmChannel != null) {
dto.setChannelName(wmChannel.getName());
}

//作者
dto.setAuthorId(wmNews.getUserId().longValue());
WmUser wmUser = wmUserMapper.selectById(wmNews.getUserId());
if (wmUser != null) {
dto.setAuthorName(wmUser.getName());
}

//设置文章id
if (wmNews.getArticleId() != null) {
dto.setId(wmNews.getArticleId());
}
dto.setCreatedTime(new Date());

ResponseResult responseResult = articleClient.saveArticle(dto);
return responseResult;

}


@Autowired
private FileStorageService fileStorageService;

@Autowired
private GreenImageScan greenImageScan;

@Autowired
private Tess4jClient tess4jClient;

/**
* 审核图片
*
* @param images
* @param wmNews
* @return
*/
private boolean handleImageScan(List<String> images, WmNews wmNews) {

boolean flag = true;

if (images == null || images.size() == 0) {
return flag;
}

//下载图片 minIO
//图片去重
images = images.stream().distinct().collect(Collectors.toList());

List<byte[]> imageList = new ArrayList<>();

try {
for (String image : images) {
byte[] bytes = fileStorageService.downLoadFile(image);

//图片识别文字审核---begin-----

//从byte[]转换为butteredImage
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
BufferedImage imageFile = ImageIO.read(in);
//识别图片的文字
String result = tess4jClient.doOCR(imageFile);

//审核是否包含自管理的敏感词
boolean isSensitive = handleSensitiveScan(result, wmNews);
if(!isSensitive){
return isSensitive;
}

//图片识别文字审核---end-----


imageList.add(bytes);

}
}catch (Exception e){
e.printStackTrace();
}


//审核图片
try {
Map map = greenImageScan.imageScan(imageList);
if (map != null) {
//审核失败
if (map.get("suggestion").equals("block")) {
flag = false;
updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
}

//不确定信息 需要人工审核
if (map.get("suggestion").equals("review")) {
flag = false;
updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
}
}

} catch (Exception e) {
flag = false;
e.printStackTrace();
}
return flag;
}

@Autowired
private GreenTextScan greenTextScan;

/**
* 审核纯文本内容
*
* @param content
* @param wmNews
* @return
*/
private boolean handleTextScan(String content, WmNews wmNews) {

boolean flag = true;

if ((wmNews.getTitle() + "-" + content).length() == 0) {
return flag;
}
try {
Map map = greenTextScan.greeTextScan((wmNews.getTitle() + "-" + content));
if (map != null) {
//审核失败
if (map.get("suggestion").equals("block")) {
flag = false;
updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
}

//不确定信息 需要人工审核
if (map.get("suggestion").equals("review")) {
flag = false;
updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
}
}
} catch (Exception e) {
flag = false;
e.printStackTrace();
}

return flag;

}

/**
* 修改文章内容
*
* @param wmNews
* @param status
* @param reason
*/
private void updateWmNews(WmNews wmNews, short status, String reason) {
wmNews.setStatus(status);
wmNews.setReason(reason);
wmNewsMapper.updateById(wmNews);
}

/**
* 1。从自媒体文章的内容中提取文本和图片
* 2.提取文章的封面图片
*
* @param wmNews
* @return
*/
private Map<String, Object> handleTextAndImages(WmNews wmNews) {

//存储纯文本内容
StringBuilder stringBuilder = new StringBuilder();

List<String> images = new ArrayList<>();

//1。从自媒体文章的内容中提取文本和图片
if (StringUtils.isNotBlank(wmNews.getContent())) {
List<Map> maps = JSONArray.parseArray(wmNews.getContent(), Map.class);
for (Map map : maps) {
if (map.get("type").equals("text")) {
stringBuilder.append(map.get("value"));
}

if (map.get("type").equals("image")) {
images.add((String) map.get("value"));
}
}
}
//2.提取文章的封面图片
if (StringUtils.isNotBlank(wmNews.getImages())) {
String[] split = wmNews.getImages().split(",");
images.addAll(Arrays.asList(split));
}

Map<String, Object> resultMap = new HashMap<>();
resultMap.put("content", stringBuilder.toString());
resultMap.put("images", images);
return resultMap;

}
}

文章详情-静态文件生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.heima.article.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.heima.article.mapper.ApArticleContentMapper;
import com.heima.article.service.ApArticleService;
import com.heima.article.service.ArticleFreemarkerService;
import com.heima.file.service.FileStorageService;
import com.heima.model.article.pojos.ApArticle;
import freemarker.template.Configuration;
import freemarker.template.Template;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;

@Service
@Slf4j
@Transactional
public class ArticleFreemarkerServiceImpl implements ArticleFreemarkerService {

@Autowired
private ApArticleContentMapper apArticleContentMapper;

@Autowired
private Configuration configuration;

@Autowired
private FileStorageService fileStorageService;

@Autowired
private ApArticleService apArticleService;

/**
* 生成静态文件上传到minIO中
* @param apArticle
* @param content
*/
@Async
@Override
public void buildArticleToMinIO(ApArticle apArticle, String content) {
//已知文章的id
//4.1 获取文章内容
if(StringUtils.isNotBlank(content)){
//4.2 文章内容通过freemarker生成html文件
Template template = null;
StringWriter out = new StringWriter();
try {
template = configuration.getTemplate("article.ftl");
//数据模型
Map<String,Object> contentDataModel = new HashMap<>();
contentDataModel.put("content", JSONArray.parseArray(content));
//合成
template.process(contentDataModel,out);
} catch (Exception e) {
e.printStackTrace();
}

//4.3 把html文件上传到minio中
InputStream in = new ByteArrayInputStream(out.toString().getBytes());
String path = fileStorageService.uploadHtmlFile("", apArticle.getId() + ".html", in);


//4.4 修改ap_article表,保存static_url字段
apArticleService.update(Wrappers.<ApArticle>lambdaUpdate().eq(ApArticle::getId,apArticle.getId())
.set(ApArticle::getStaticUrl,path));


}
}

}

2.在ApArticleService的saveArticle实现方法中添加调用生成文件的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* 保存app端相关文章
* @param dto
* @return
*/
@Override
public ResponseResult saveArticle(ArticleDto dto) {

// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
//1.检查参数
if(dto == null){
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}

ApArticle apArticle = new ApArticle();
BeanUtils.copyProperties(dto,apArticle);

//2.判断是否存在id
if(dto.getId() == null){
//2.1 不存在id 保存 文章 文章配置 文章内容

//保存文章
save(apArticle);

//保存配置
ApArticleConfig apArticleConfig = new ApArticleConfig(apArticle.getId());
apArticleConfigMapper.insert(apArticleConfig);

//保存 文章内容
ApArticleContent apArticleContent = new ApArticleContent();
apArticleContent.setArticleId(apArticle.getId());
apArticleContent.setContent(dto.getContent());
apArticleContentMapper.insert(apArticleContent);

}else {
//2.2 存在id 修改 文章 文章内容

//修改 文章
updateById(apArticle);

//修改文章内容
ApArticleContent apArticleContent = apArticleContentMapper.selectOne(Wrappers.<ApArticleContent>lambdaQuery().eq(ApArticleContent::getArticleId, dto.getId()));
apArticleContent.setContent(dto.getContent());
apArticleContentMapper.updateById(apArticleContent);
}

//异步调用 生成静态文件上传到minio中
articleFreemarkerService.buildArticleToMinIO(apArticle,dto.getContent());


//3.结果返回 文章的id
return ResponseResult.okResult(apArticle.getId());
}

阶段四:延迟任务精准发布文章

代码实现:

1:在测试包jdk下创建延迟任务元素对象DelayedTask,实现compareTo和getDelay方法,

2:在main方法中创建DelayQueue并向延迟队列中添加三个延迟任务,

3:循环的从延迟队列中拉取任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class DelayedTask  implements Delayed{

// 任务的执行时间
private int executeTime = 0;

public DelayedTask(int delay){
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.SECOND,delay);
this.executeTime = (int)(calendar.getTimeInMillis() /1000 );
}

/**
* 元素在队列中的剩余时间
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
Calendar calendar = Calendar.getInstance();
return executeTime - (calendar.getTimeInMillis()/1000);
}

/**
* 元素排序
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
long val = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
return val == 0 ? 0 : ( val < 0 ? -1: 1 );
}


public static void main(String[] args) {
DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();

queue.add(new DelayedTask(5));
queue.add(new DelayedTask(10));
queue.add(new DelayedTask(15));

System.out.println(System.currentTimeMillis()/1000+" start consume ");
while(queue.size() != 0){
DelayedTask delayedTask = queue.poll();
if(delayedTask !=null ){
System.out.println(System.currentTimeMillis()/1000+" cosume task");
}
//每隔一秒消费一次
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

②:添加bootstrap.yml

1
2
3
4
5
6
7
8
9
10
11
12
server:
port: 51701
spring:
application:
name: leadnews-schedule
cloud:
nacos:
discovery:
server-addr: 192.168.200.130:8848
config:
server-addr: 192.168.200.130:8848
file-extension: yml

③:在nacos中添加对应配置,并添加数据库及mybatis-plus的配置

1
2
3
4
5
6
7
8
9
10
11
spring:
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
username: root
password: root
# 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
mybatis-plus:
mapper-locations: classpath*:mapper/*.xml
# 设置别名包扫描路径,通过该属性可以给包中的类注册别名
type-aliases-package: com.heima.model.schedule.pojos

未来数据定时刷新-功能完成

在TaskService中添加方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {
System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");

// 获取所有未来数据集合的key值
Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
for (String futureKey : futureKeys) { // future_250_250

String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
//获取该组key下当前需要消费的任务数据
Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
if (!tasks.isEmpty()) {
//将这些任务数据添加到消费者队列中
cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
}
}
}

在引导类中添加开启任务调度注解:@EnableScheduling

分布式锁解决集群下的方法抢占执行

redis分布式锁

sexnx (SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。

这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作

  • 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功
  • 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败
  • 客户端A执行代码完成,删除锁
  • 客户端B在等待一段时间后再去请求设置key的值,设置成功
  • 客户端B执行代码完成,删除锁

修改未来数据定时刷新的方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 未来数据定时刷新
*/
@Scheduled(cron = "0 */1 * * * ?")
public void refresh(){

String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
if(StringUtils.isNotBlank(token)){
log.info("未来数据定时刷新---定时任务");

//获取所有未来数据的集合key
Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
for (String futureKey : futureKeys) {//future_100_50

//获取当前数据的key topic
String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];

//按照key和分值查询符合条件的数据
Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());

//同步数据
if(!tasks.isEmpty()){
cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
log.info("成功的将"+futureKey+"刷新到了"+topicKey);
}
}
}
}

数据库同步到redis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Scheduled(cron = "0 */5 * * * ?")
@PostConstruct
public void reloadData() {
clearCache();
log.info("数据库数据同步到缓存");
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, 5);

//查看小于未来5分钟的所有任务
List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
if(allTasks != null && allTasks.size() > 0){
for (Taskinfo taskinfo : allTasks) {
Task task = new Task();
BeanUtils.copyProperties(taskinfo,task);
task.setExecuteTime(taskinfo.getExecuteTime().getTime());
addTaskToCache(task);
}
}
}

private void clearCache(){
// 删除缓存中未来数据集合和当前消费者队列的所有key
Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
cacheService.delete(futurekeys);
cacheService.delete(topickeys);
}

延迟队列解决精准时间发布文章

延迟队列服务提供对外接口

提供远程的feign接口,在heima-leadnews-feign-api编写类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.heima.apis.schedule;

import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.schedule.dtos.Task;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

@FeignClient("leadnews-schedule")
public interface IScheduleClient {

/**
* 添加任务
* @param task 任务对象
* @return 任务id
*/
@PostMapping("/api/v1/task/add")
public ResponseResult addTask(@RequestBody Task task);

/**
* 取消任务
* @param taskId 任务id
* @return 取消结果
*/
@GetMapping("/api/v1/task/cancel/{taskId}")
public ResponseResult cancelTask(@PathVariable("taskId") long taskId);

/**
* 按照类型和优先级来拉取任务
* @param type
* @param priority
* @return
*/
@GetMapping("/api/v1/task/poll/{type}/{priority}")
public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority);
}

在heima-leadnews-schedule微服务下提供对应的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.heima.schedule.feign;

import com.heima.apis.schedule.IScheduleClient;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.schedule.dtos.Task;
import com.heima.schedule.service.TaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;


@RestController
public class ScheduleClient implements IScheduleClient {

@Autowired
private TaskService taskService;

/**
* 添加任务
* @param task 任务对象
* @return 任务id
*/
@PostMapping("/api/v1/task/add")
@Override
public ResponseResult addTask(@RequestBody Task task) {
return ResponseResult.okResult(taskService.addTask(task));
}

/**
* 取消任务
* @param taskId 任务id
* @return 取消结果
*/
@GetMapping("/api/v1/task/cancel/{taskId}")
@Override
public ResponseResult cancelTask(@PathVariable("taskId") long taskId) {
return ResponseResult.okResult(taskService.cancelTask(taskId));
}

/**
* 按照类型和优先级来拉取任务
* @param type
* @param priority
* @return
*/
@GetMapping("/api/v1/task/poll/{type}/{priority}")
@Override
public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) {
return ResponseResult.okResult(taskService.poll(type,priority));
}
}
发布文章集成添加延迟队列接口

在创建WmNewsTaskService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.heima.wemedia.service;

import com.heima.model.wemedia.pojos.WmNews;


public interface WmNewsTaskService {

/**
* 添加任务到延迟队列中
* @param id 文章的id
* @param publishTime 发布的时间 可以做为任务的执行时间
*/
public void addNewsToTask(Integer id, Date publishTime);


}

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.heima.wemedia.service.impl;

import com.heima.apis.schedule.IScheduleClient;
import com.heima.model.common.enums.TaskTypeEnum;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.wemedia.pojos.WmNews;
import com.heima.utils.common.ProtostuffUtil;
import com.heima.wemedia.service.WmNewsTaskService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;


@Service
@Slf4j
public class WmNewsTaskServiceImpl implements WmNewsTaskService {


@Autowired
private IScheduleClient scheduleClient;

/**
* 添加任务到延迟队列中
* @param id 文章的id
* @param publishTime 发布的时间 可以做为任务的执行时间
*/
@Override
@Async
public void addNewsToTask(Integer id, Date publishTime) {

log.info("添加任务到延迟服务中----begin");

Task task = new Task();
task.setExecuteTime(publishTime.getTime());
task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
WmNews wmNews = new WmNews();
wmNews.setId(id);
task.setParameters(ProtostuffUtil.serialize(wmNews));

scheduleClient.addTask(task);

log.info("添加任务到延迟服务中----end");

}

}

序列化工具对比

  • JdkSerialize:java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化, ObjectOutputStream的writeObject()方法可序列化对象生成字节数组
  • Protostuff:google开源的protostuff采用更为紧凑的二进制数组,表现更加优异,然后使用protostuff的编译工具生成pojo类

拷贝资料中的两个类到heima-leadnews-utils下

Protostuff需要引导依赖:

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.6.0</version>
</dependency>

<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.6.0</version>
</dependency>

修改发布文章代码:

把之前的异步调用修改为调用延迟任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Autowired
private WmNewsTaskService wmNewsTaskService;

/**
* 发布修改文章或保存为草稿
* @param dto
* @return
*/
@Override
public ResponseResult submitNews(WmNewsDto dto) {

//0.条件判断
if(dto == null || dto.getContent() == null){
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}

//1.保存或修改文章

WmNews wmNews = new WmNews();
//属性拷贝 属性名词和类型相同才能拷贝
BeanUtils.copyProperties(dto,wmNews);
//封面图片 list---> string
if(dto.getImages() != null && dto.getImages().size() > 0){
//[1dddfsd.jpg,sdlfjldk.jpg]--> 1dddfsd.jpg,sdlfjldk.jpg
String imageStr = StringUtils.join(dto.getImages(), ",");
wmNews.setImages(imageStr);
}
//如果当前封面类型为自动 -1
if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
wmNews.setType(null);
}

saveOrUpdateWmNews(wmNews);

//2.判断是否为草稿 如果为草稿结束当前方法
if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

//3.不是草稿,保存文章内容图片与素材的关系
//获取到文章内容中的图片信息
List<String> materials = ectractUrlInfo(dto.getContent());
saveRelativeInfoForContent(materials,wmNews.getId());

//4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片
saveRelativeInfoForCover(dto,wmNews,materials);

//审核文章
// wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());

return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);

}
消费任务进行审核文章

WmNewsTaskService中添加方法

1
2
3
4
/**
* 消费延迟队列数据
*/
public void scanNewsByTask();

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Autowired
private WmNewsAutoScanServiceImpl wmNewsAutoScanService;

/**
* 消费延迟队列数据
*/
@Scheduled(fixedRate = 1000)
@Override
@SneakyThrows
public void scanNewsByTask() {

log.info("文章审核---消费任务执行---begin---");

ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
if(responseResult.getCode().equals(200) && responseResult.getData() != null){
String json_str = JSON.toJSONString(responseResult.getData());
Task task = JSON.parseObject(json_str, Task.class);
byte[] parameters = task.getParameters();
WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);
System.out.println(wmNews.getId()+"-----------");
wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
}
log.info("文章审核---消费任务执行---end---");
}

在WemediaApplication自媒体的引导类中添加开启任务调度注解@EnableScheduling

阶段五:kafka及异步通知文章上下架

自媒体文章上下架-功能实现

在WmNewsDto中新增enable属性 ,完整的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.heima.model.wemedia.dtos;

import lombok.Data;

import java.util.Date;
import java.util.List;

@Data
public class WmNewsDto {

private Integer id;
/**
* 标题
*/
private String title;
/**
* 频道id
*/
private Integer channelId;
/**
* 标签
*/
private String labels;
/**
* 发布时间
*/
private Date publishTime;
/**
* 文章内容
*/
private String content;
/**
* 文章封面类型 0 无图 1 单图 3 多图 -1 自动
*/
private Short type;
/**
* 提交时间
*/
private Date submitedTime;
/**
* 状态 提交为1 草稿为0
*/
private Short status;

/**
* 封面图片列表 多张图以逗号隔开
*/
private List<String> images;

/**
* 上下架 0 下架 1 上架
*/
private Short enable;
}

9.4.2)业务层编写

在WmNewsService新增方法

1
2
3
4
5
6
/**
* 文章的上下架
* @param dto
* @return
*/
public ResponseResult downOrUp(WmNewsDto dto);

实现方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 文章的上下架
* @param dto
* @return
*/
@Override
public ResponseResult downOrUp(WmNewsDto dto) {
//1.检查参数
if(dto.getId() == null){
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}

//2.查询文章
WmNews wmNews = getById(dto.getId());
if(wmNews == null){
return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在");
}

//3.判断文章是否已发布
if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态,不能上下架");
}

//4.修改文章enable
if(dto.getEnable() != null && dto.getEnable() > -1 && dto.getEnable() < 2){
update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable())
.eq(WmNews::getId,wmNews.getId()));
}
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}

9.4.3)控制器

1
2
3
4
@PostMapping("/down_or_up")
public ResponseResult downOrUp(@RequestBody WmNewsDto dto){
return wmNewsService.downOrUp(dto);
}

消息通知article端文章上下架

在heima-leadnews-common模块下导入kafka依赖

1
2
3
4
5
6
7
8
9
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>

9.5.2)在自媒体端的nacos配置中心配置kafka的生产者

1
2
3
4
5
6
7
spring:
kafka:
bootstrap-servers: 192.168.200.130:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

9.5.3)在自媒体端文章上下架后发送消息

1
2
3
4
5
6
7
//发送消息,通知article端修改文章配置
if(wmNews.getArticleId() != null){
Map<String,Object> map = new HashMap<>();
map.put("articleId",wmNews.getArticleId());
map.put("enable",dto.getEnable());
kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
}

常量类:

1
2
3
4
public class WmNewsMessageConstants {

public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}

9.5.4)在article端的nacos配置中心配置kafka的消费者

1
2
3
4
5
6
7
spring:
kafka:
bootstrap-servers: 192.168.200.130:9092
consumer:
group-id: ${spring.application.name}
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

9.5.5)在article端编写监听,接收数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.heima.article.listener;

import com.alibaba.fastjson.JSON;
import com.heima.article.service.ApArticleConfigService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@Slf4j
public class ArtilceIsDownListener {

@Autowired
private ApArticleConfigService apArticleConfigService;

@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
public void onMessage(String message){
if(StringUtils.isNotBlank(message)){
Map map = JSON.parseObject(message, Map.class);
apArticleConfigService.updateByMap(map);
log.info("article端文章配置修改,articleId={}",map.get("articleId"));
}
}
}

9.5.6)修改ap_article_config表的数据

新建ApArticleConfigService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.heima.article.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.heima.model.article.pojos.ApArticleConfig;

import java.util.Map;

public interface ApArticleConfigService extends IService<ApArticleConfig> {

/**
* 修改文章配置
* @param map
*/
public void updateByMap(Map map);
}

实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.heima.article.service.impl;

import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.heima.article.mapper.ApArticleConfigMapper;
import com.heima.article.service.ApArticleConfigService;
import com.heima.model.article.pojos.ApArticleConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Map;

@Service
@Slf4j
@Transactional
public class ApArticleConfigServiceImpl extends ServiceImpl<ApArticleConfigMapper, ApArticleConfig> implements ApArticleConfigService {


/**
* 修改文章配置
* @param map
*/
@Override
public void updateByMap(Map map) {
//0 下架 1 上架
Object enable = map.get("enable");
boolean isDown = true;
if(enable.equals(1)){
isDown = false;
}
//修改文章配置
update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,isDown));

}
}

阶段六:app端文章搜索

查询所有的文章信息,批量导入到es索引库中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.heima.es;

import com.alibaba.fastjson.JSON;
import com.heima.es.mapper.ApArticleMapper;
import com.heima.es.pojo.SearchArticleVo;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.List;


@SpringBootTest
@RunWith(SpringRunner.class)
public class ApArticleTest {

@Autowired
private ApArticleMapper apArticleMapper;

@Autowired
private RestHighLevelClient restHighLevelClient;


/**
* 注意:数据量的导入,如果数据量过大,需要分页导入
* @throws Exception
*/
@Test
public void init() throws Exception {

//1.查询所有符合条件的文章数据
List<SearchArticleVo> searchArticleVos = apArticleMapper.loadArticleList();

//2.批量导入到es索引库

BulkRequest bulkRequest = new BulkRequest("app_info_article");

for (SearchArticleVo searchArticleVo : searchArticleVos) {

IndexRequest indexRequest = new IndexRequest().id(searchArticleVo.getId().toString())
.source(JSON.toJSONString(searchArticleVo), XContentType.JSON);

//批量添加数据
bulkRequest.add(indexRequest);

}
restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);

}

}

文章搜索功能实现

搭建搜索微服务

实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package com.heima.search.service.impl;

import com.alibaba.fastjson.JSON;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.search.dtos.UserSearchDto;
import com.heima.model.user.pojos.ApUser;
import com.heima.search.service.ArticleSearchService;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

@Service
@Slf4j
public class ArticleSearchServiceImpl implements ArticleSearchService {

@Autowired
private RestHighLevelClient restHighLevelClient;

/**
* es文章分页检索
*
* @param dto
* @return
*/
@Override
public ResponseResult search(UserSearchDto dto) throws IOException {

//1.检查参数
if(dto == null || StringUtils.isBlank(dto.getSearchWords())){
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}

//2.设置查询条件
SearchRequest searchRequest = new SearchRequest("app_info_article");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

//布尔查询
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();

//关键字的分词之后查询
QueryStringQueryBuilder queryStringQueryBuilder = QueryBuilders.queryStringQuery(dto.getSearchWords()).field("title").field("content").defaultOperator(Operator.OR);
boolQueryBuilder.must(queryStringQueryBuilder);

//查询小于mindate的数据
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("publishTime").lt(dto.getMinBehotTime().getTime());
boolQueryBuilder.filter(rangeQueryBuilder);

//分页查询
searchSourceBuilder.from(0);
searchSourceBuilder.size(dto.getPageSize());

//按照发布时间倒序查询
searchSourceBuilder.sort("publishTime", SortOrder.DESC);

//设置高亮 title
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.field("title");
highlightBuilder.preTags("<font style='color: red; font-size: inherit;'>");
highlightBuilder.postTags("</font>");
searchSourceBuilder.highlighter(highlightBuilder);


searchSourceBuilder.query(boolQueryBuilder);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);


//3.结果封装返回

List<Map> list = new ArrayList<>();

SearchHit[] hits = searchResponse.getHits().getHits();
for (SearchHit hit : hits) {
String json = hit.getSourceAsString();
Map map = JSON.parseObject(json, Map.class);
//处理高亮
if(hit.getHighlightFields() != null && hit.getHighlightFields().size() > 0){
Text[] titles = hit.getHighlightFields().get("title").getFragments();
String title = StringUtils.join(titles);
//高亮标题
map.put("h_title",title);
}else {
//原始标题
map.put("h_title",map.get("title"));
}
list.add(map);
}

return ResponseResult.okResult(list);

}
}

文章自动审核构建索引

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package com.heima.article.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.heima.article.mapper.ApArticleContentMapper;
import com.heima.article.service.ApArticleService;
import com.heima.article.service.ArticleFreemarkerService;
import com.heima.common.constants.ArticleConstants;
import com.heima.file.service.FileStorageService;
import com.heima.model.article.pojos.ApArticle;
import com.heima.model.search.vos.SearchArticleVo;
import freemarker.template.Configuration;
import freemarker.template.Template;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;

@Service
@Slf4j
@Transactional
public class ArticleFreemarkerServiceImpl implements ArticleFreemarkerService {

@Autowired
private ApArticleContentMapper apArticleContentMapper;

@Autowired
private Configuration configuration;

@Autowired
private FileStorageService fileStorageService;

@Autowired
private ApArticleService apArticleService;

/**
* 生成静态文件上传到minIO中
* @param apArticle
* @param content
*/
@Async
@Override
public void buildArticleToMinIO(ApArticle apArticle, String content) {
//已知文章的id
//4.1 获取文章内容
if(StringUtils.isNotBlank(content)){
//4.2 文章内容通过freemarker生成html文件
Template template = null;
StringWriter out = new StringWriter();
try {
template = configuration.getTemplate("article.ftl");
//数据模型
Map<String,Object> contentDataModel = new HashMap<>();
contentDataModel.put("content", JSONArray.parseArray(content));
//合成
template.process(contentDataModel,out);
} catch (Exception e) {
e.printStackTrace();
}


//4.3 把html文件上传到minio中
InputStream in = new ByteArrayInputStream(out.toString().getBytes());
String path = fileStorageService.uploadHtmlFile("", apArticle.getId() + ".html", in);


//4.4 修改ap_article表,保存static_url字段
apArticleService.update(Wrappers.<ApArticle>lambdaUpdate().eq(ApArticle::getId,apArticle.getId())
.set(ApArticle::getStaticUrl,path));

//发送消息,创建索引
createArticleESIndex(apArticle,content,path);

}
}

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;

/**
* 送消息,创建索引
* @param apArticle
* @param content
* @param path
*/
private void createArticleESIndex(ApArticle apArticle, String content, String path) {
SearchArticleVo vo = new SearchArticleVo();
BeanUtils.copyProperties(apArticle,vo);
vo.setContent(content);
vo.setStaticUrl(path);

kafkaTemplate.send(ArticleConstants.ARTICLE_ES_SYNC_TOPIC, JSON.toJSONString(vo));
}

}

在ArticleConstants类中添加新的常量,完整代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.heima.common.constants;

public class ArticleConstants {
public static final Short LOADTYPE_LOAD_MORE = 1;
public static final Short LOADTYPE_LOAD_NEW = 2;
public static final String DEFAULT_TAG = "__all__";

public static final String ARTICLE_ES_SYNC_TOPIC = "article.es.sync.topic";

public static final Integer HOT_ARTICLE_LIKE_WEIGHT = 3;
public static final Integer HOT_ARTICLE_COMMENT_WEIGHT = 5;
public static final Integer HOT_ARTICLE_COLLECTION_WEIGHT = 8;

public static final String HOT_ARTICLE_FIRST_PAGE = "hot_article_first_page_";
}

3.文章微服务集成kafka发送消息

在文章微服务的nacos的配置中心添加如下配置

1
2
3
4
5
6
kafka:
bootstrap-servers: 192.168.200.130:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

搜索微服务接收消息并创建索引

1.搜索微服务中添加kafka的配置,nacos配置如下

1
2
3
4
5
6
7
spring:
kafka:
bootstrap-servers: 192.168.200.130:9092
consumer:
group-id: ${spring.application.name}
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

2.定义监听接收消息,保存索引数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.heima.search.listener;

import com.alibaba.fastjson.JSON;
import com.heima.common.constants.ArticleConstants;
import com.heima.model.search.vos.SearchArticleVo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@Slf4j
public class SyncArticleListener {

@Autowired
private RestHighLevelClient restHighLevelClient;

@KafkaListener(topics = ArticleConstants.ARTICLE_ES_SYNC_TOPIC)
public void onMessage(String message){
if(StringUtils.isNotBlank(message)){

log.info("SyncArticleListener,message={}",message);

SearchArticleVo searchArticleVo = JSON.parseObject(message, SearchArticleVo.class);
IndexRequest indexRequest = new IndexRequest("app_info_article");
indexRequest.id(searchArticleVo.getId().toString());
indexRequest.source(message, XContentType.JSON);
try {
restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
log.error("sync es error={}",e);
}
}

}
}

app端搜索-搜索记录

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package com.heima.search.service.impl;

import com.alibaba.fastjson.JSON;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.search.dtos.UserSearchDto;
import com.heima.model.user.pojos.ApUser;
import com.heima.search.service.ApUserSearchService;
import com.heima.search.service.ArticleSearchService;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

@Service
@Slf4j
public class ArticleSearchServiceImpl implements ArticleSearchService {

@Autowired
private RestHighLevelClient restHighLevelClient;

@Autowired
private ApUserSearchService apUserSearchService;

/**
* es文章分页检索
*
* @param dto
* @return
*/
@Override
public ResponseResult search(UserSearchDto dto) throws IOException {

//1.检查参数
if(dto == null || StringUtils.isBlank(dto.getSearchWords())){
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}

ApUser user = AppThreadLocalUtil.getUser();

//异步调用 保存搜索记录
if(user != null && dto.getFromIndex() == 0){
apUserSearchService.insert(dto.getSearchWords(), user.getId());
}


//2.设置查询条件
SearchRequest searchRequest = new SearchRequest("app_info_article");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

//布尔查询
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();

//关键字的分词之后查询
QueryStringQueryBuilder queryStringQueryBuilder = QueryBuilders.queryStringQuery(dto.getSearchWords()).field("title").field("content").defaultOperator(Operator.OR);
boolQueryBuilder.must(queryStringQueryBuilder);

//查询小于mindate的数据
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("publishTime").lt(dto.getMinBehotTime().getTime());
boolQueryBuilder.filter(rangeQueryBuilder);

//分页查询
searchSourceBuilder.from(0);
searchSourceBuilder.size(dto.getPageSize());

//按照发布时间倒序查询
searchSourceBuilder.sort("publishTime", SortOrder.DESC);

//设置高亮 title
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.field("title");
highlightBuilder.preTags("<font style='color: red; font-size: inherit;'>");
highlightBuilder.postTags("</font>");
searchSourceBuilder.highlighter(highlightBuilder);


searchSourceBuilder.query(boolQueryBuilder);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);


//3.结果封装返回

List<Map> list = new ArrayList<>();

SearchHit[] hits = searchResponse.getHits().getHits();
for (SearchHit hit : hits) {
String json = hit.getSourceAsString();
Map map = JSON.parseObject(json, Map.class);
//处理高亮
if(hit.getHighlightFields() != null && hit.getHighlightFields().size() > 0){
Text[] titles = hit.getHighlightFields().get("title").getFragments();
String title = StringUtils.join(titles);
//高亮标题
map.put("h_title",title);
}else {
//原始标题
map.put("h_title",map.get("title"));
}
list.add(map);
}

return ResponseResult.okResult(list);

}
}

加载搜索记录列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.heima.search.controller.v1;

import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.search.dtos.UserSearchDto;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/v1/associate")
public class ApAssociateWordsController {


@PostMapping("/search")
public ResponseResult search(@RequestBody UserSearchDto userSearchDto) {
return null;
}
}

业务层

新建联想词业务层接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.heima.search.service;

import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.search.dtos.UserSearchDto;

/**
* <p>
* 联想词表 服务类
* </p>
*
* @author itheima
*/
public interface ApAssociateWordsService {

/**
联想词
@param userSearchDto
@return
*/
ResponseResult findAssociate(UserSearchDto userSearchDto);

}

实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.heima.search.service.impl;

import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.search.dtos.UserSearchDto;
import com.heima.search.pojos.ApAssociateWords;
import com.heima.search.service.ApAssociateWordsService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;

import java.util.List;

/**
* @Description:
* @Version: V1.0
*/
@Service
public class ApAssociateWordsServiceImpl implements ApAssociateWordsService {

@Autowired
MongoTemplate mongoTemplate;

/**
* 联想词
* @param userSearchDto
* @return
*/
@Override
public ResponseResult findAssociate(UserSearchDto userSearchDto) {
//1 参数检查
if(userSearchDto == null || StringUtils.isBlank(userSearchDto.getSearchWords())){
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
//分页检查
if (userSearchDto.getPageSize() > 20) {
userSearchDto.setPageSize(20);
}

//3 执行查询 模糊查询
Query query = Query.query(Criteria.where("associateWords").regex(".*?\\" + userSearchDto.getSearchWords() + ".*"));
query.limit(userSearchDto.getPageSize());
List<ApAssociateWords> wordsList = mongoTemplate.find(query, ApAssociateWords.class);

return ResponseResult.okResult(wordsList);
}
}

控制器

新建联想词控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.heima.search.controller.v1;

import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.search.dtos.UserSearchDto;
import com.heima.search.service.ApAssociateWordsService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* <p>
* 联想词表 前端控制器
* </p>
* @author itheima
*/
@Slf4j
@RestController
@RequestMapping("/api/v1/associate")
public class ApAssociateWordsController{

@Autowired
private ApAssociateWordsService apAssociateWordsService;

@PostMapping("/search")
public ResponseResult findAssociate(@RequestBody UserSearchDto userSearchDto) {
return apAssociateWordsService.findAssociate(userSearchDto);
}
}

阶段七:xxl-Job分布式任务调度

业务层实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package com.heima.article.service.impl;

import com.alibaba.fastjson.JSON;
import com.heima.apis.wemedia.IWemediaClient;
import com.heima.article.mapper.ApArticleMapper;
import com.heima.article.service.HotArticleService;
import com.heima.common.constants.ArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.article.pojos.ApArticle;
import com.heima.model.article.vos.HotArticleVo;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.wemedia.pojos.WmChannel;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

@Service
@Slf4j
@Transactional
public class HotArticleServiceImpl implements HotArticleService {

@Autowired
private ApArticleMapper apArticleMapper;

/**
* 计算热点文章
*/
@Override
public void computeHotArticle() {
//1.查询前5天的文章数据
Date dateParam = DateTime.now().minusDays(50).toDate();
List<ApArticle> apArticleList = apArticleMapper.findArticleListByLast5days(dateParam);

//2.计算文章的分值
List<HotArticleVo> hotArticleVoList = computeHotArticle(apArticleList);

//3.为每个频道缓存30条分值较高的文章
cacheTagToRedis(hotArticleVoList);

}

@Autowired
private IWemediaClient wemediaClient;

@Autowired
private CacheService cacheService;

/**
* 为每个频道缓存30条分值较高的文章
* @param hotArticleVoList
*/
private void cacheTagToRedis(List<HotArticleVo> hotArticleVoList) {
//每个频道缓存30条分值较高的文章
ResponseResult responseResult = wemediaClient.getChannels();
if(responseResult.getCode().equals(200)){
String channelJson = JSON.toJSONString(responseResult.getData());
List<WmChannel> wmChannels = JSON.parseArray(channelJson, WmChannel.class);
//检索出每个频道的文章
if(wmChannels != null && wmChannels.size() > 0){
for (WmChannel wmChannel : wmChannels) {
List<HotArticleVo> hotArticleVos = hotArticleVoList.stream().filter(x -> x.getChannelId().equals(wmChannel.getId())).collect(Collectors.toList());
//给文章进行排序,取30条分值较高的文章存入redis key:频道id value:30条分值较高的文章
sortAndCache(hotArticleVos, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + wmChannel.getId());
}
}
}


//设置推荐数据
//给文章进行排序,取30条分值较高的文章存入redis key:频道id value:30条分值较高的文章
sortAndCache(hotArticleVoList, ArticleConstants.HOT_ARTICLE_FIRST_PAGE+ArticleConstants.DEFAULT_TAG);


}

/**
* 排序并且缓存数据
* @param hotArticleVos
* @param key
*/
private void sortAndCache(List<HotArticleVo> hotArticleVos, String key) {
hotArticleVos = hotArticleVos.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
if (hotArticleVos.size() > 30) {
hotArticleVos = hotArticleVos.subList(0, 30);
}
cacheService.set(key, JSON.toJSONString(hotArticleVos));
}

/**
* 计算文章分值
* @param apArticleList
* @return
*/
private List<HotArticleVo> computeHotArticle(List<ApArticle> apArticleList) {

List<HotArticleVo> hotArticleVoList = new ArrayList<>();

if(apArticleList != null && apArticleList.size() > 0){
for (ApArticle apArticle : apArticleList) {
HotArticleVo hot = new HotArticleVo();
BeanUtils.copyProperties(apArticle,hot);
Integer score = computeScore(apArticle);
hot.setScore(score);
hotArticleVoList.add(hot);
}
}
return hotArticleVoList;
}

/**
* 计算文章的具体分值
* @param apArticle
* @return
*/
private Integer computeScore(ApArticle apArticle) {
Integer scere = 0;
if(apArticle.getLikes() != null){
scere += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
}
if(apArticle.getViews() != null){
scere += apArticle.getViews();
}
if(apArticle.getComment() != null){
scere += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
}
if(apArticle.getCollection() != null){
scere += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
}

return scere;
}
}

在ArticleApplication的引导类中添加以下注解

1
@EnableFeignClients(basePackages = "com.heima.apis")

现在数据库中准备点数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.heima.article.service.impl;

import com.heima.article.ArticleApplication;
import com.heima.article.service.HotArticleService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = ArticleApplication.class)
@RunWith(SpringRunner.class)
public class HotArticleServiceImplTest {

@Autowired
private HotArticleService hotArticleService;

@Test
public void computeHotArticle() {
hotArticleService.computeHotArticle();
}
}

xxl-job定时计算-步骤

①:在heima-leadnews-article中的pom文件中新增依赖

1
2
3
4
5
6
<!--xxl-job-->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>

② 在xxl-job-admin中新建执行器和任务

③ leadnews-article中集成xxl-job

XxlJobConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.heima.article.config;

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

@Value("${xxl.job.executor.appname}")
private String appname;

@Value("${xxl.job.executor.port}")
private int port;


@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setPort(port);
return xxlJobSpringExecutor;
}


}

在nacos配置新增配置

1
2
3
4
5
6
7
xxl:
job:
admin:
addresses: http://192.168.200.130:8888/xxl-job-admin
executor:
appname: leadnews-hot-article-executor
port: 9999

④:在article微服务中新建任务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.heima.article.job;

import com.heima.article.service.HotArticleService;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ComputeHotArticleJob {

@Autowired
private HotArticleService hotArticleService;

@XxlJob("computeHotArticleJob")
public void handle(){
log.info("热文章分值计算调度任务开始执行...");
hotArticleService.computeHotArticle();
log.info("热文章分值计算调度任务结束...");

}
}

阶段八:热点文章-实时计算

用户行为(阅读量,评论,点赞,收藏)发送消息,以阅读和点赞为例

①在heima-leadnews-behavior微服务中集成kafka生产者配置

修改nacos,新增内容

1
2
3
4
5
6
7
8
9
spring:
application:
name: leadnews-behavior
kafka:
bootstrap-servers: 192.168.200.130:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

②修改ApLikesBehaviorServiceImpl新增发送消息

定义消息发送封装类:UpdateArticleMess

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.heima.model.mess;

import lombok.Data;

@Data
public class UpdateArticleMess {

/**
* 修改文章的字段类型
*/
private UpdateArticleType type;
/**
* 文章ID
*/
private Long articleId;
/**
* 修改数据的增量,可为正负
*/
private Integer add;

public enum UpdateArticleType{
COLLECTION,COMMENT,LIKES,VIEWS;
}
}

topic常量类:

1
2
3
4
5
6
7
package com.heima.common.constants;

public class HotArticleConstants {

public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";

}

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.heima.behavior.service.impl;

import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApLikesBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.LikesBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;


@Service
@Transactional
@Slf4j
public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {

@Autowired
private CacheService cacheService;

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;

@Override
public ResponseResult like(LikesBehaviorDto dto) {

//1.检查参数
if (dto == null || dto.getArticleId() == null || checkParam(dto)) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}

//2.是否登录
ApUser user = AppThreadLocalUtil.getUser();
if (user == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
}

UpdateArticleMess mess = new UpdateArticleMess();
mess.setArticleId(dto.getArticleId());
mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);

//3.点赞 保存数据
if (dto.getOperation() == 0) {
Object obj = cacheService.hGet(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
if (obj != null) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, "已点赞");
}
// 保存当前key
log.info("保存当前key:{} ,{}, {}", dto.getArticleId(), user.getId(), dto);
cacheService.hPut(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));
mess.setAdd(1);
} else {
// 删除当前key
log.info("删除当前key:{}, {}", dto.getArticleId(), user.getId());
cacheService.hDelete(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
mess.setAdd(-1);
}

//发送消息,数据聚合
kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));


return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);

}

/**
* 检查参数
*
* @return
*/
private boolean checkParam(LikesBehaviorDto dto) {
if (dto.getType() > 2 || dto.getType() < 0 || dto.getOperation() > 1 || dto.getOperation() < 0) {
return true;
}
return false;
}
}

③修改阅读行为的类ApReadBehaviorServiceImpl发送消息

完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.heima.behavior.service.impl;

import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApReadBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.ReadBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Transactional
@Slf4j
public class ApReadBehaviorServiceImpl implements ApReadBehaviorService {

@Autowired
private CacheService cacheService;

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;

@Override
public ResponseResult readBehavior(ReadBehaviorDto dto) {
//1.检查参数
if (dto == null || dto.getArticleId() == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}

//2.是否登录
ApUser user = AppThreadLocalUtil.getUser();
if (user == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
}
//更新阅读次数
String readBehaviorJson = (String) cacheService.hGet(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
if (StringUtils.isNotBlank(readBehaviorJson)) {
ReadBehaviorDto readBehaviorDto = JSON.parseObject(readBehaviorJson, ReadBehaviorDto.class);
dto.setCount((short) (readBehaviorDto.getCount() + dto.getCount()));
}
// 保存当前key
log.info("保存当前key:{} {} {}", dto.getArticleId(), user.getId(), dto);
cacheService.hPut(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));

//发送消息,数据聚合
UpdateArticleMess mess = new UpdateArticleMess();
mess.setArticleId(dto.getArticleId());
mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
mess.setAdd(1);
kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));


return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);

}
}

使用kafkaStream实时接收消息,聚合内容

①在leadnews-article微服务中集成kafkaStream (参考kafka-demo)

②定义实体类,用于聚合之后的分值封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.heima.model.article.mess;

import lombok.Data;

@Data
public class ArticleVisitStreamMess {
/**
* 文章id
*/
private Long articleId;
/**
* 阅读
*/
private int view;
/**
* 收藏
*/
private int collect;
/**
* 评论
*/
private int comment;
/**
* 点赞
*/
private int like;
}

修改常量类:增加常量

1
2
3
4
5
6
7
package com.heima.common.constans;

public class HotArticleConstants {

public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
}

③ 定义stream,接收消息并聚合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package com.heima.article.stream;

import com.alibaba.fastjson.JSON;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import com.heima.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

@Configuration
@Slf4j
public class HotArticleStreamHandler {

@Bean
public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
//接收消息
KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
//聚合流式处理
stream.map((key,value)->{
UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
//重置消息的key:1234343434 和 value: likes:1
return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
})
//按照文章id进行聚合
.groupBy((key,value)->key)
//时间窗口
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
/**
* 自行的完成聚合的计算
*/
.aggregate(new Initializer<String>() {
/**
* 初始方法,返回值是消息的value
* @return
*/
@Override
public String apply() {
return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
}
/**
* 真正的聚合操作,返回值是消息的value
*/
}, new Aggregator<String, String, String>() {
@Override
public String apply(String key, String value, String aggValue) {
if(StringUtils.isBlank(value)){
return aggValue;
}
String[] aggAry = aggValue.split(",");
int col = 0,com=0,lik=0,vie=0;
for (String agg : aggAry) {
String[] split = agg.split(":");
/**
* 获得初始值,也是时间窗口内计算之后的值
*/
switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
case COLLECTION:
col = Integer.parseInt(split[1]);
break;
case COMMENT:
com = Integer.parseInt(split[1]);
break;
case LIKES:
lik = Integer.parseInt(split[1]);
break;
case VIEWS:
vie = Integer.parseInt(split[1]);
break;
}
}
/**
* 累加操作
*/
String[] valAry = value.split(":");
switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
case COLLECTION:
col += Integer.parseInt(valAry[1]);
break;
case COMMENT:
com += Integer.parseInt(valAry[1]);
break;
case LIKES:
lik += Integer.parseInt(valAry[1]);
break;
case VIEWS:
vie += Integer.parseInt(valAry[1]);
break;
}

String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
System.out.println("文章的id:"+key);
System.out.println("当前时间窗口内的消息处理结果:"+formatStr);
return formatStr;
}
}, Materialized.as("hot-atricle-stream-count-001"))
.toStream()
.map((key,value)->{
return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
})
//发送消息
.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);

return stream;


}

/**
* 格式化消息的value数据
* @param articleId
* @param value
* @return
*/
public String formatObj(String articleId,String value){
ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
mess.setArticleId(Long.valueOf(articleId));
//COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
String[] valAry = value.split(",");
for (String val : valAry) {
String[] split = val.split(":");
switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
case COLLECTION:
mess.setCollect(Integer.parseInt(split[1]));
break;
case COMMENT:
mess.setComment(Integer.parseInt(split[1]));
break;
case LIKES:
mess.setLike(Integer.parseInt(split[1]));
break;
case VIEWS:
mess.setView(Integer.parseInt(split[1]));
break;
}
}
log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
return JSON.toJSONString(mess);

}
}

重新计算文章的分值,更新到数据库和缓存中

①在ApArticleService添加方法,用于更新数据库中的文章分值

1
2
3
4
5
/**
* 更新文章的分值 同时更新缓存中的热点文章数据
* @param mess
*/
public void updateScore(ArticleVisitStreamMess mess);

实现类方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/**
* 更新文章的分值 同时更新缓存中的热点文章数据
* @param mess
*/
@Override
public void updateScore(ArticleVisitStreamMess mess) {
//1.更新文章的阅读、点赞、收藏、评论的数量
ApArticle apArticle = updateArticle(mess);
//2.计算文章的分值
Integer score = computeScore(apArticle);
score = score * 3;

//3.替换当前文章对应频道的热点数据
replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());

//4.替换推荐对应的热点数据
replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);

}

/**
* 替换数据并且存入到redis
* @param apArticle
* @param score
* @param s
*/
private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {
String articleListStr = cacheService.get(s);
if (StringUtils.isNotBlank(articleListStr)) {
List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class);

boolean flag = true;

//如果缓存中存在该文章,只更新分值
for (HotArticleVo hotArticleVo : hotArticleVoList) {
if (hotArticleVo.getId().equals(apArticle.getId())) {
hotArticleVo.setScore(score);
flag = false;
break;
}
}

//如果缓存中不存在,查询缓存中分值最小的一条数据,进行分值的比较,如果当前文章的分值大于缓存中的数据,就替换
if (flag) {
if (hotArticleVoList.size() >= 30) {
hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);
if (lastHot.getScore() < score) {
hotArticleVoList.remove(lastHot);
HotArticleVo hot = new HotArticleVo();
BeanUtils.copyProperties(apArticle, hot);
hot.setScore(score);
hotArticleVoList.add(hot);
}


} else {
HotArticleVo hot = new HotArticleVo();
BeanUtils.copyProperties(apArticle, hot);
hot.setScore(score);
hotArticleVoList.add(hot);
}
}
//缓存到redis
hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
cacheService.set(s, JSON.toJSONString(hotArticleVoList));

}
}

/**
* 更新文章行为数量
* @param mess
*/
private ApArticle updateArticle(ArticleVisitStreamMess mess) {
ApArticle apArticle = getById(mess.getArticleId());
apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());
apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());
apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());
apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());
updateById(apArticle);
return apArticle;

}

/**
* 计算文章的具体分值
* @param apArticle
* @return
*/
private Integer computeScore(ApArticle apArticle) {
Integer score = 0;
if(apArticle.getLikes() != null){
score += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
}
if(apArticle.getViews() != null){
score += apArticle.getViews();
}
if(apArticle.getComment() != null){
score += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
}
if(apArticle.getCollection() != null){
score += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
}

return score;
}

②定义监听,接收聚合之后的数据,文章的分值重新进行计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.heima.article.listener;

import com.alibaba.fastjson.JSON;
import com.heima.article.service.ApArticleService;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ArticleIncrHandleListener {

@Autowired
private ApArticleService apArticleService;

@KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
public void onMessage(String mess){
if(StringUtils.isNotBlank(mess)){
ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
apArticleService.updateScore(articleVisitStreamMess);

}
}
}