• 欢迎访问搞代码网站,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站!
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏搞代码吧

C# HttpClient 如何使用 Consul 发现服务

c# 搞代码 4年前 (2022-01-09) 16次浏览 已收录 0个评论

试用了Overt.Core.Grpc, 把 GRPC 的使用改造得像 WCF, 性能测试也非常不错, 非常推荐各位使用.
  但已有项目大多是 http 请求, 改造成 GRPC 的话, 工作量比较大, 于是又找到了 Steeltoe.Discovery, 在 Startup 给 HttpClient 添加 DelegatingHandler, 动态改变请求url中的 host 和 port, 将http请求指向consul 发现的服务实例, 这样就实现了服务的动态发现.
  经过性能测试, Steeltoe.Discovery 只有 Overt.Core.Grpc 的20%, 非常难以接受, 于是自己实现了一套基于 consul 的服务发现工具. 嗯, 名字好难取啊, 暂定为 ConsulDiscovery.HttpClient 吧
  功能很简单:

  1. webapi 从json中读取配置信息 ConsulDiscoveryOptions;
  2. 如果自己是一个服务, 则将自己注册到consul中并设置健康检查Url;
  3. ConsulDiscovery.HttpClient 内有一个consul client 定时刷新所有服务的url访问地址.

比较核心的两个类

using Consul;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace ConsulDiscovery.HttpClient
{
  public class DiscoveryClient : IDisposable
  {
    private readonly ConsulDiscoveryOptions consulDiscoveryOptions;
    private readonly Timer timer;
    private readonly ConsulClient consulClient;
    private readonly string serviceIdInConsul;

    public Dictionary<string, List<string>> AllServices { get; private set; } = new Dictionary<string, List<string>>(StringComparer.OrdinalIgnoreCase);


    public DiscoveryClient(IOptions<ConsulDiscoveryOptions> options)
    {
      consulDiscoveryOptions = options.Value;
      consulClient = new ConsulClient(x => x.Address = new Uri($"http://{consulDiscoveryOptions.ConsulServerSetting.IP}:{consulDiscoveryOptions.ConsulServerSetting.Port}"));
      timer = new Timer(Refresh);

      if (consulDiscoveryOptions.ServiceRegisterSetting != null)
      {
        serviceIdInConsul = Guid.NewGuid().ToString();
      }
    }

    public void Start()
    {
      var checkErrorMsg = CheckParams();
      if (checkErrorMsg != null)
      {
        throw new ArgumentException(checkErrorMsg);
      }
      RegisterToConsul();
      timer.Change(0, consulDiscoveryOptions.ConsulServerSetting.RefreshIntervalInMilliseconds);
    }

    public void Stop()
    {
      Dispose();
    }

    private string CheckParams()
    {
      if (string.IsNullOrWhiteSpace(consulDiscoveryOptions.ConsulServerSetting.IP))
      {
        return "Consul服务器地址 ConsulDiscoveryOptions.ConsulServerSetting.IP 不能为空";
      }

      if (consulDiscoveryOptions.ServiceRegisterSetting != null)
      {
        var registerSetting = consulDiscoveryOptions.ServiceRegisterSetting;
        if (string.IsNullOrWhiteSpace(registerSetting.ServiceName))
        {
          return "服务名称 ConsulDiscoveryOptions.ServiceRegisterSetting.ServiceName 不能为空";
        }
        if (string.IsNullOrWhiteSpace(registerSetting.ServiceIP))
        {
          return "服务地址 ConsulDiscoveryOptions.ServiceRegisterSetting.ServiceIP 不能为空";
        }
      }
      return null;
    }

    private void RegisterToConsul()
    {
      if (string.IsNullOrEmpty(serviceIdInConsul))
      {
        return;
      }

      var registerSetting = consulDiscoveryOptions.ServiceRegisterSetting;
      var httpCheck = new AgentServiceCheck()
      {
        HTTP = $"{registerSetting.ServiceScheme}{Uri.SchemeDelimiter}{registerSetting.ServiceIP}:{registerSetting.ServicePort}/{registerSetting.HealthCheckRelativeUrl.TrimStart('/')}",
        Interval = TimeSpan.FromMilliseconds(registerSetting.HealthCheckIntervalInMilliseconds),
        Timeout = TimeSpan.FromMilliseconds(registerSetting.HealthCheckTimeOutInMilliseconds),
        DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(10),
      };
      var registration = new AgentServiceRegistration()
      {
        ID = serviceIdInConsul,
        Name = registerSetting.ServiceName,
        Address = registerSetting.ServiceIP,
        Port = registerSetting.ServicePort,
        Check = httpCheck,
        Meta = new Dictionary<string, string>() { ["scheme"] = registerSetting.ServiceScheme },
      };
      consulClient.Agent.ServiceRegister(registration).Wait();
    }

    private void DeregisterFromConsul()
    {
      if (string.IsNullOrEmpty(serviceIdInConsul))
      {
        return;
      }
      try
      {
        consulClient.Agent.ServiceDeregister(serviceIdInConsul).Wait();
      }
      catch
      { }
    }

    private void Refresh(object state)
    {
      Dictionary<string, AgentService>.ValueCollection serversInConsul;
      try
      {
        serversInConsul = consulClient.Agent.Services().Result.Response.Values;
      }
      catch // (Exception ex)
      {
        // 如果连接consul出错, 则不更新服务列表. 继续使用以前获取到的服务列表
        // 但是如果很长时间都不能连接consul, 服务列表里的一些实例已经不可用了, 还一直提供这样旧的列表也不合理, 所以要不要在这里实现 健康检查? 这样的话, 就得把检查地址变成不能设置的
        return;
      }

      // 1. 更新服务列表
      // 2. 如果这个程序提供了服务, 还要检测 服务Id 是否在服务列表里
      var tempServices = new Dictionary<string, HashSet<string>>();
      bool needReregisterToConsul = true;
      foreach (var service in serversInConsul)
      {
        var serviceName = service.Service;
        if (!service.Meta.TryGetValue("scheme", out var serviceScheme))
        {
          serviceScheme = Uri.UriSchemeHttp;
        }
        var serviceHost = $"{serviceScheme}{Uri.SchemeDelimiter}{service.Address}:{service.Port}";
        if (!tempServices.TryGetValue(serviceName, out var serviceHosts))
        {
          serviceHosts = new HashSet<string>();
          tempServices[serviceName] = serviceHosts;
        }
        serviceHosts.Add(serviceHost);

        if (needReregisterToConsul && !string.IsNullOrEmpty(serviceIdInConsul) && serviceIdInConsul == service.ID)
        {
          needReregisterToConsul = false;
        }
      }

      if (needReregisterToConsul)
      {
        RegisterToConsul<div>本文来源gaodai^.ma#com搞#代!码网</div>();
      }

      var tempAllServices = new Dictionary<string, List<string>>(StringComparer.OrdinalIgnoreCase);
      foreach (var item in tempServices)
      {
        tempAllServices[item.Key] = item.Value.ToList();
      }
      AllServices = tempAllServices;
    }


    public void Dispose()
    {
      DeregisterFromConsul();
      consulClient.Dispose();
      timer.Dispose();
    }
  }
}
using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace ConsulDiscovery.HttpClient
{
  public class DiscoveryHttpMessageHandler : DelegatingHandler
  {
    private static readonly Random random = new Random((int)DateTime.Now.Ticks);

    private readonly DiscoveryClient discoveryClient;

    public DiscoveryHttpMessageHandler(DiscoveryClient discoveryClient)
    {
      this.discoveryClient = discoveryClient;
    }

    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
      if (discoveryClient.AllServices.TryGetValue(request.RequestUri.Host, out var serviceHosts))
      {
        if (serviceHosts.Count > 0)
        {
          var index = random.Next(serviceHosts.Count);
          request.RequestUri = new Uri(new Uri(serviceHosts[index]), request.RequestUri.PathAndQuery);
        }
      }
      return await base.SendAsync(request, cancellationToken).ConfigureAwait(false);
    }
  }
}

搞代码网(gaodaima.com)提供的所有资源部分来自互联网,如果有侵犯您的版权或其他权益,请说明详细缘由并提供版权或权益证明然后发送到邮箱[email protected],我们会在看到邮件的第一时间内为您处理,或直接联系QQ:872152909。本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:C# HttpClient 如何使用 Consul 发现服务
喜欢 (0)
[搞代码]
分享 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址