隐藏

SolrCloud-如何在.NET程序中使用

发布:2022/6/18 23:29:34作者:管理员 来源:本站 浏览次数:751

https://github.com/vladen/SolrNet


原来我们在我们的项目里用的是根据数据库路由到不同的单机Solr服务器,但是这样的话,每次Solr配置的修改都要修改三台不通的服务器,而且一台服务器挂了,必定会影响一部分用户不能使用搜索功能,而且还会造成一定程度的丢数据,所以我们换一种方式。


   两种可选方案:


       主从模式

       SolrCloud


经过对比,决定用SolrCloud,SolrCloud的概念和优缺点,就不再赘述了,网上一搜一大堆,这里主要写一下在C#如何使用SolrCloud。


最早我们用 EasyNet.Solr 来进行Solr查询的,但是貌似EasyNet.Solr 没有对SolrCloud的查询做封装,所以就各种找资料,最后只能自己封装了。


首先SolrCloud是通过zookeeper来调度的,那么我们就要先去zookeeper上面去load可用的节点,并且对 zookeeper的 clusterstate.json 文件做心跳检测,如果clusterstate.json 里的内容有变化,则说明节点状态有变化,需要重新load文件里的内容进行解析,否则就在应用程序第一次调用的时候加载一次,缓存起来。


献上核心代码


要先通过NuGet安装Zookeeper Client


在EasyNet.Solr根目录定义接口ISolrCloudOperrations.cs


   using System;

   using System.Collections.Generic;

   using System.Linq;

   using System.Text;

   

   namespace EasyNet.Solr

   {

    public interface ISolrCloudOperrations

    {

    string GetSolrCloudServer(string collectionName, bool isWrite);

    }

   }


在Impl文件夹里实现接口 SolrCloudOperations.cs


   internal static class ZookeeperStatus

    {

    //每隔两秒钟pingzookeeper服务器,并监听solr服务器状态,如果状态有改变,更新状态文件,请求连接列表

    private static void Ping(string zkHost) {

    //如果是第一次调用,则加载配置文件

    if (DataLength == 0)

    {

    foreach (var host in zkHost.Split(',').ToList())

    {

    Start(host);

    }

    Task.Factory.StartNew(() =>

    {

    while (true)

    {

    foreach (var host in zkHost.Split(',').ToList())

    {

    Task.Factory.StartNew(Start, host);

    }

    Thread.Sleep(2000);

    }

    });

    }

    }

    private static object pingObj = new object();

    private static void Start(object zkHost) {

    var watcher = new Watcher();

    using (var zk = new ZooKeeper(zkHost.ToString(), new TimeSpan(0, 0, 0, 10000), watcher))

    {

    var dataChange = watcher.WaitUntilWatched();

    Org.Apache.Zookeeper.Data.Stat stat = null;

    try

    {

    stat = zk.Exists("/clusterstate.json", false);

    }

    finally

    {

    if (stat != null)

    {

    byte[] data = null;

    lock (pingObj)

    {

    if (DataLength == 0 || DataLength != stat.DataLength)

    {

    data = zk.GetData("/clusterstate.json", false, stat);

    DataLength = stat.DataLength;

    SetShard(data);

    }

    }

    }

    }

    }

    }

   

    private static int DataLength = 0;

   

    private static Dictionary<string, List<Shard>> shards = new Dictionary<string, System.Collections.Generic.List<Shard>>();

    private static ReaderWriterLockSlim rw = new ReaderWriterLockSlim();

    private static void SetShard(byte[] data)

    {

    var str = System.Text.Encoding.UTF8.GetString(data);

    JObject jObj = JsonConvert.DeserializeObject<JObject>(str);

    List<Shard> shardList = new List<Shard>();

    try

    {

    rw.EnterWriteLock();

    foreach (var p in jObj)

    {

    foreach (var item in p.Value["shards"])

    {

    var jItem = item.First as JObject;

    if (jItem["state"].ToString() == "active")

    {

    foreach (var replica in jItem["replicas"])

    {

    var jReplica = replica.First as JObject;

    if (jReplica["state"].ToString() == "active")

    {

    Shard shard = new Shard() { BaseUrl = jReplica["base_url"].ToString() };

    if (jReplica["leader"] != null && "true" == jReplica["leader"].ToString())

    {

    shard.Leader = true;

    }

    shardList.Add(shard);

    }

    }

    }

    }

    if (shards.ContainsKey(p.Key))

    {

    shards[p.Key] = shardList;

    }

    else

    {

    shards.Add(p.Key, shardList);

    }

    }

    }

    finally

    {

    rw.ExitWriteLock();

    }

    }

   

    public static string ZookeeperHost = "127.0.0.1:2181";

   

    public static string GetCollection(string collectionName, bool isWrite)

    {

    ///第一次调用,则开始ping

    if (DataLength == 0)

    {

    Ping(ZookeeperHost);

    }

    IEnumerable<Shard> tempShardList = null;

    try

    {

    rw.EnterReadLock();

    var shardList = shards[collectionName];

   

    if (!isWrite)

    {

    tempShardList = shardList.Where(s => s.Leader == false);

    }

    //如果从库挂了,那么只能从主库读取了

    if (tempShardList == null || tempShardList.Count() == 0)

    tempShardList = shardList.Where(s => s.Leader == true);

    }

    finally

    {

    rw.ExitReadLock();

    }

    if (tempShardList == null) throw new Exception("no active shard");

    //随机取值

    int random = new Random().Next(tempShardList.Count() - 1);

    return tempShardList.ToList()[random].BaseUrl;

    }

   

    }

    internal class Shard

    {

    public string BaseUrl { get; set; }

    public bool Leader { get; set; }

    }

    internal enum Changed

    {

    None,

    Children,

    Data

    }

    internal class Watcher : IWatcher

    {

    private readonly ManualResetEventSlim _changed = new ManualResetEventSlim(false);

    private WatchedEvent _event;

    public Changed WaitUntilWatched()

    {

    _changed.Wait();

    if (_event == null) throw new ApplicationException("bad state");

    if (_event.State != KeeperState.SyncConnected)

    throw new ApplicationException("cannot connect");

    if (_event.Type == EventType.NodeChildrenChanged)

    {

    return Changed.Children;

    }

    if (_event.Type == EventType.NodeDataChanged)

    {

    return Changed.Data;

    }

    return Changed.None;

    }

    void IWatcher.Process(WatchedEvent @event)

    {

    _event = @event;

    _changed.Set();

    }

    }


这样我们如果是write操作,就会调用leader分片对应的节点进行写入,如果是查询操作,则尽量直接调用非leader来进行查询,如果非leader节点挂了,那么久从主节点进行查询。