一个轻量化的消息中间件
需要较高版本Node.js,推荐V14,支持跨平台
git clone https://gitee.com/onlyyyy/AvenirMQ.git
cd AvenirMQ
pm2 start AvenirMQ
或先安装pm2:npm i pm2 -g
并提供Nodejs版操作库:https://www.npmjs.com/package/avenirmq
用户信息保存在user.json中
async login(data) {
if (!data.name || !data.password) {
throw ('INVALID_LOGIN');
}
let password = delQuotation(libcu.cipher.AesDecode(data.password));
toLog("password = ", password);
if (!this.userList[data.name] || this.userList[data.name].password != password) {
throw ('INVALID_LOGIN');
}
//其他的就成功了
let sign = getSign(data.name, data.password);
toLog("生成签名", sign);
return sign;
}
解析routingkey
//解析绑定时的from.to.key
parseKey(keys) {
let arr = keys.split('.');
toLog('arr = ', arr);
if (arr.length != 3) {
throw ("BAD_KEYS");
}
return {
send: arr[0],
to: arr[1],
type: arr[2],
}
}
加解密: 使用libcu.cipher.AesEncode/AesDecode函数进行加解密。
密钥参见libcu库。
用户增删改查暂略。
async add2ConnectPool(data, sign, client) {
this.signPool[sign] = {
conn: client,
name: data.name,
createTime: moment().valueOf(),
};
throw ({ code: SUCCESS, data: sign });
}
将连接保存到对象中,下次发送消息的时候会优先选用连接发送,失败的话就会更新连接池
async AvenirMQSend(msg, type) {
//20210110先写个简单版的 不用promise.all发送消息
for (let i = 0; i < msg.length; i++) {
try {
······
if(type != 'gc') {
for (let j = 0; j < bind.length; j++) {
//20210116增加对类型的判断
if ((bind[j].type === sub.type || bind[j].type === AvenirMQ_ALL)
&& (bind[j].receive === sub.to)) {
//说明这是要发送的消息
let info = {
ip: bind[j].ip,
port: bind[j].port,
};
let conn = null;
if (this.connPoll[bind[j].ip] && this.connPoll[bind[j].ip][bind[j].port]) {
conn = this.connPoll[bind[j].ip][bind[j].port].conn;
}
let newSub = JSON.parse(JSON.stringify(sub));
await this.send(text, conn, info, newSub, type, i);
} else {
toLog("存在无人接收的信息", msg[i]);
}
}
} else {
let conn = null;
if (this.connPoll[gcInfo.ip] && this.connPoll[gcInfo.ip][gcInfo.port]) {
conn = this.connPoll[gcInfo.ip][gcInfo.port].conn;
}
await this.send(text, conn, gcInfo, sub, type, i);
}
····
} catch (error) {
toLog("AvenirMQSend error->",error);
}
}
}
将gc与普通的消息放在一个函数中处理。
{
type:'login',
name:'test',
password:'AES',
}
返回值
{
code:0,
message:'success',
data:'sign'
}
//key为send.to.type的结构 表示自己的键值与接收的键值 以及接收的消息类型
//告诉AvenirMQ自己的连接信息
{
"type":"addUser",
"name":"test",
"password":"123456"
"key":"a.b.rpc"
"ip":"127.0.0.1"
"port":13000,
}
返回值
{
"code":0,
"message":"success",
}
{
"type":"deleteUser",
"name":"test",
}
返回值
{
"code":0,
"message":"success",
}
//key为send.to.type的结构 send : 接收名称为send的发送方消息,to:发送给谁 type:消息类型
//告诉AvenirMQ自己的连接信息
{
"type":"updateUser",
"name":"test",
"password":"123456"
"key":"a.b.rpc"
"ip":"127.0.0.1"
"port":13000,
}
返回值
{
"code":0,
"message":"success",
}
//生产者->消费者的概念
{
sign:"test",
type:"send",
data:"hello world"
}
返回值
{
"code":0,
"message":"success",
}
{
sign:'test',
type:'setKey',
data: {
name:"test",
key:"a.b.rpc",
}
}
返回值
{
code:0,
message:'success',
}
{
code:0,
message:'success',
sender:'发送方的名字',
data:'消息',
}
{
type:'userList',
}
返回值
{
code:0,
message:'success',
data:[a,b],
}
[main]
ip=127.0.0.1
port=52013
[mq]
#用户文件路径
userFileName=./user.json
#是否输出日志
ifConsoleLog=true
#连接超时时间
timeOut=10
#是否为长连接
keepAlive=true
#AvenirMQ发消息的超时时间(秒)
MQTimeOut=3
#AvenirMQ重发消息的周期(秒) 范围 2-50
MQResend=2
#重试次数
retryTime=5
本项目照着RabbitMQ的思想简单地实现了一个消息中间件,不过没有使用AMQP协议,而只是简单的tcp处理,只能后期再优化了。
不过当这个项目在我脑海中浮现,我就认为我应该通过努力将它编写出来。目前AvenirMQ也达到了能用的程度了,这一路上学到的知识也是久久难忘的。
技术没有高低贵贱之分,脑海中如果有想法的话,我们要做的就是去把它实现。
百舸争流,奋楫者先。
编程之路漫漫修远兮,吾将上下而求索。
谢谢。
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Комментарии ( 0 )