实现简单的多节点抢注(主)功能

博客 分享
0 212
张三
张三 2022-06-08 12:00:13
悬赏:0 积分 收藏

实现简单的多节点抢注(主)功能

前言

  在分布式系统中经常会遇到某个业务仅需要单个节点执行的场景,通常这样做是为了解决并发引起的状态不一致问题。但是为了防止出现单点故障,又需要为这些节点做故障转移的实现。简单的方案是同时起多个节点,但是只有一个节点作为主节点执行业务,其他的作为备份节点需要实时跟踪主节点运行状态,一旦发现主节点挂掉就将自己转变为主节点进行业务处理,这也就是所谓的“多节点抢注(主)”。

 

实现

  实现一个简单的多节点抢注功能并不复杂,只需要借助一些中间件进行状态维护就可以做到,这里使用大家常用的redis作为实现方案。这个最小实现方案中仅涉及三个参与角色,分别是:

  WorkNode -- 工作节点

  Redis -- Redis缓存

  CallbackHandler -- 节点注册成功回调(代表业务处理)

具体实现代码如下(仅供参考,相关细节需要自己根据业务进一步实现)

WorkNode

package me.kongdl.ezwok.work;import java.util.Random;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicBoolean;/** * @author: kongdl * @date: 2022/6/7 14:55 * @description: 分布式节点抢占注册模型 **/public class WorkNode extends Thread {    // 业务代码    private String code;    // 节点ID    private String nodeId;    // redis    private Redis redis;    // 主节点标志    private AtomicBoolean master;    // 注册成功回调接口    private CallbackHandler callbackHandler;    public WorkNode(String code, String nodeId, CallbackHandler callbackHandler) {        this.code = code;        this.nodeId = nodeId;        this.redis = new Redis();        this.master = new AtomicBoolean(false);        this.callbackHandler = callbackHandler;    }    @Override    public void run() {        // master:event-handler => <NODE_ID>        String key = "master:" + code;        while (!registerAsMaster(key)) {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();                return;            }        }        redis.expire(key, 5, TimeUnit.SECONDS);        while (true) { // 节点续期            try {                Thread.sleep(3000);            } catch (InterruptedException e) {                e.printStackTrace();                break;            }            redis.expire(key, 5, TimeUnit.SECONDS);        }    }    /**     * 尝试注册为主节点     * @param key     * @return true-成功,false-失败     */    private boolean registerAsMaster(String key) {        boolean result = redis.setnx(key, nodeId);        if (result) {            master.set(true);            // callback in async mode            new Thread(() -> callbackHandler.handle(this)).start();        }        return result;    }    /**     * 当前节点是否为主节点     * @return     */    public boolean isMaster() {        return master.get();    }    /**     * 业务代码     * @return     */    public String getCode() {        return code;    }    /**     * 节点ID     * @return     */    public String getNodeId() {        return nodeId;    }}
View Code

 

Redis

package me.kongdl.ezwok.work;import org.springframework.data.redis.connection.RedisStandaloneConfiguration;import org.springframework.data.redis.connection.jedis.JedisClientConfiguration;import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;import org.springframework.data.redis.core.StringRedisTemplate;import redis.clients.jedis.JedisPoolConfig;import java.util.concurrent.TimeUnit;/** * @author: kongdl * @date: 2022/6/7 14:58 * @description: Redis **/public class Redis {    private final StringRedisTemplate redisTemplate;    public Redis() {        this.redisTemplate = initRedisTemplate();    }    private StringRedisTemplate initRedisTemplate() {        RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration();        standaloneConfig.setHostName("localhost");        standaloneConfig.setPort(6379);        standaloneConfig.setPassword("Redis#321");        standaloneConfig.setDatabase(0);        JedisPoolConfig poolConfig = new JedisPoolConfig();        poolConfig.setMinIdle(2);        poolConfig.setMaxIdle(10);        poolConfig.setMaxTotal(100);        JedisClientConfiguration clientConfiguration = JedisClientConfiguration.builder().usePooling().poolConfig(poolConfig).build();        JedisConnectionFactory connectionFactory = new JedisConnectionFactory(standaloneConfig, clientConfiguration);        connectionFactory.afterPropertiesSet();        StringRedisTemplate redisTemplate = new StringRedisTemplate(connectionFactory);        redisTemplate.afterPropertiesSet();        return redisTemplate;    }    public boolean setnx(String key, String value) {        return redisTemplate.opsForValue().setIfAbsent(key, value);    }    public boolean expire(String key, long timeout, TimeUnit unit) {        return redisTemplate.expire(key, timeout, unit);    }}
View Code

 

CallbackHandler

package me.kongdl.ezwok.work;/** * @author: kongdl * @date: 2022/6/7 15:58 * @description: 回调处理接口 **/public interface CallbackHandler {    /**     * 回调处理     */    void handle(WorkNode node);}
View Code

 

测试

/*** 模拟多节点下的运行状况**/public class Demo {    public static void main(String[] args) {        // 业务代码,用以区分不同的业务        String code = "event-handler";        // 节点注册成功后的回调处理        final CallbackHandler callbackHandler = node -> {            // 执行业务操作            System.out.println(node.getCode() + "<" + node.getNodeId() + "> registered as master ok!");            Random random = new Random(System.currentTimeMillis());            try { // 模拟线程随机挂掉                TimeUnit.SECONDS.sleep(random.nextInt(10));                node.interrupt();            } catch (InterruptedException e) {                e.printStackTrace();            }        };        // 启动多个节点        for (int i = 1; i <= 10; i++) {            String nodeId = "node-" + i;            new WorkNode(code, nodeId, callbackHandler).start();        }    }}

 

结论

  测试代码执行后会发现某个节点注册成功,运行一段时间(几秒)后挂掉,后续备份节点会自动注册成为主节点并接替执行业务,从而证明了该模型具备了基本的节点抢注和故障转移功能。当然,实际生产环境具有更复杂的场景和业务需求,但是可以认为都是在这个基础上进行了相关扩展。

 

posted @ 2022-06-08 10:39 lichmama 阅读(7) 评论(0) 编辑 收藏 举报
回帖
    张三

    张三 (王者 段位)

    821 积分 (2)粉丝 (41)源码

     

    温馨提示

    亦奇源码

    最新会员