试用了Overt.Core.Grpc, 把 GRPC 的使用改造得像 WCF, 性能测试也非常不错, 非常推荐各位使用.
但已有项目大多是 http 请求, 改造成 GRPC 的话, 工作量比较大, 于是又找到了 Steeltoe.Discovery, 在 Startup 给 HttpClient 添加 DelegatingHandler, 动态改变请求url中的 host 和 port, 将http请求指向consul 发现的服务实例, 这样就实现了服务的动态发现.
经过性能测试, Steeltoe.Discovery 只有 Overt.Core.Grpc 的20%, 非常难以接受, 于是自己实现了一套基于 consul 的服务发现工具. 嗯, 名字好难取啊, 暂定为 ConsulDiscovery.HttpClient 吧
功能很简单:
- webapi 从json中读取配置信息 ConsulDiscoveryOptions;
- 如果自己是一个服务, 则将自己注册到consul中并设置健康检查Url;
- ConsulDiscovery.HttpClient 内有一个consul client 定时刷新所有服务的url访问地址.
比较核心的两个类
using Consul; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Linq; using System.Threading; namespace ConsulDiscovery.HttpClient { public class DiscoveryClient : IDisposable { private readonly ConsulDiscoveryOptions consulDiscoveryOptions; private readonly Timer timer; private readonly ConsulClient consulClient; private readonly string serviceIdInConsul; public Dictionary<string, List<string>> AllServices { get; private set; } = new Dictionary<string, List<string>>(StringComparer.OrdinalIgnoreCase); public DiscoveryClient(IOptions<ConsulDiscoveryOptions> options) { consulDiscoveryOptions = options.Value; consulClient = new ConsulClient(x => x.Address = new Uri($"http://{consulDiscoveryOptions.ConsulServerSetting.IP}:{consulDiscoveryOptions.ConsulServerSetting.Port}")); timer = new Timer(Refresh); if (consulDiscoveryOptions.ServiceRegisterSetting != null) { serviceIdInConsul = Guid.NewGuid().ToString(); } } public void Start() { var checkErrorMsg = CheckParams(); if (checkErrorMsg != null) { throw new ArgumentException(checkErrorMsg); } RegisterToConsul(); timer.Change(0, consulDiscoveryOptions.ConsulServerSetting.RefreshIntervalInMilliseconds); } public void Stop() { Dispose(); } private string CheckParams() { if (string.IsNullOrWhiteSpace(consulDiscoveryOptions.ConsulServerSetting.IP)) { return "Consul服务器地址 ConsulDiscoveryOptions.ConsulServerSetting.IP 不能为空"; } if (consulDiscoveryOptions.ServiceRegisterSetting != null) { var registerSetting = consulDiscoveryOptions.ServiceRegisterSetting; if (string.IsNullOrWhiteSpace(registerSetting.ServiceName)) { return "服务名称 ConsulDiscoveryOptions.ServiceRegisterSetting.ServiceName 不能为空"; } if (string.IsNullOrWhiteSpace(registerSetting.ServiceIP)) { return "服务地址 ConsulDiscoveryOptions.ServiceRegisterSetting.ServiceIP 不能为空"; } } return null; } private void RegisterToConsul() { if (string.IsNullOrEmpty(serviceIdInConsul)) { return; } var registerSetting = consulDiscoveryOptions.ServiceRegisterSetting; var httpCheck = new AgentServiceCheck() { HTTP = $"{registerSetting.ServiceScheme}{Uri.SchemeDelimiter}{registerSetting.ServiceIP}:{registerSetting.ServicePort}/{registerSetting.HealthCheckRelativeUrl.TrimStart('/')}", Interval = TimeSpan.FromMilliseconds(registerSetting.HealthCheckIntervalInMilliseconds), Timeout = TimeSpan.FromMilliseconds(registerSetting.HealthCheckTimeOutInMilliseconds), DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(10), }; var registration = new AgentServiceRegistration() { ID = serviceIdInConsul, Name = registerSetting.ServiceName, Address = registerSetting.ServiceIP, Port = registerSetting.ServicePort, Check = httpCheck, Meta = new Dictionary<string, string>() { ["scheme"] = registerSetting.ServiceScheme }, }; consulClient.Agent.ServiceRegister(registration).Wait(); } private void DeregisterFromConsul() { if (string.IsNullOrEmpty(serviceIdInConsul)) { return; } try { consulClient.Agent.ServiceDeregister(serviceIdInConsul).Wait(); } catch { } } private void Refresh(object state) { Dictionary<string, AgentService>.ValueCollection serversInConsul; try { serversInConsul = consulClient.Agent.Services().Result.Response.Values; } catch // (Exception ex) { // 如果连接consul出错, 则不更新服务列表. 继续使用以前获取到的服务列表 // 但是如果很长时间都不能连接consul, 服务列表里的一些实例已经不可用了, 还一直提供这样旧的列表也不合理, 所以要不要在这里实现 健康检查? 这样的话, 就得把检查地址变成不能设置的 return; } // 1. 更新服务列表 // 2. 如果这个程序提供了服务, 还要检测 服务Id 是否在服务列表里 var tempServices = new Dictionary<string, HashSet<string>>(); bool needReregisterToConsul = true; foreach (var service in serversInConsul) { var serviceName = service.Service; if (!service.Meta.TryGetValue("scheme", out var serviceScheme)) { serviceScheme = Uri.UriSchemeHttp; } var serviceHost = $"{serviceScheme}{Uri.SchemeDelimiter}{service.Address}:{service.Port}"; if (!tempServices.TryGetValue(serviceName, out var serviceHosts)) { serviceHosts = new HashSet<string>(); tempServices[serviceName] = serviceHosts; } serviceHosts.Add(serviceHost); if (needReregisterToConsul && !string.IsNullOrEmpty(serviceIdInConsul) && serviceIdInConsul == service.ID) { needReregisterToConsul = false; } } if (needReregisterToConsul) { RegisterToConsul<div>本文来源gaodai^.ma#com搞#代!码网</div>(); } var tempAllServices = new Dictionary<string, List<string>>(StringComparer.OrdinalIgnoreCase); foreach (var item in tempServices) { tempAllServices[item.Key] = item.Value.ToList(); } AllServices = tempAllServices; } public void Dispose() { DeregisterFromConsul(); consulClient.Dispose(); timer.Dispose(); } } }
using System; using System.Net.Http; using System.Threading; using System.Threading.Tasks; namespace ConsulDiscovery.HttpClient { public class DiscoveryHttpMessageHandler : DelegatingHandler { private static readonly Random random = new Random((int)DateTime.Now.Ticks); private readonly DiscoveryClient discoveryClient; public DiscoveryHttpMessageHandler(DiscoveryClient discoveryClient) { this.discoveryClient = discoveryClient; } protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) { if (discoveryClient.AllServices.TryGetValue(request.RequestUri.Host, out var serviceHosts)) { if (serviceHosts.Count > 0) { var index = random.Next(serviceHosts.Count); request.RequestUri = new Uri(new Uri(serviceHosts[index]), request.RequestUri.PathAndQuery); } } return await base.SendAsync(request, cancellationToken).ConfigureAwait(false); } } }