使用 etcd 实现自定义服务发现
2015年8月17日作者 Fabian Reinartz
在之前的一篇文章中,我们介绍了在 Prometheus 中实现服务发现的多种新方法。从那时起,发生了很多变化。我们改进了内部实现,并收到了来自社区的杰出贡献,增加了对 Kubernetes 和 Marathon 服务发现的支持。这些功能将在 0.16 版本中发布。
我们还谈到了自定义服务发现的话题。
并非每种服务发现都足够通用,可以直接集成到 Prometheus 中。很可能您的组织已经有了一套专有系统,而您只需要让它与 Prometheus 协同工作。这并不意味着您不能享受自动发现新监控目标带来的好处。
在本文中,我们将实现一个小工具程序,它能将一个基于 etcd(一个高一致性的分布式键值存储)的自定义服务发现方法连接到 Prometheus。
etcd 和 Prometheus 中的目标
我们虚构的服务发现系统在一个定义明确的键模式下存储服务及其服务实例。
/services/<service_name>/<instance_id> = <instance_address>
现在,Prometheus 应该能够随着所有现有服务的出现和消失,自动添加和移除目标。我们可以与 Prometheus 基于文件的服务发现集成,该功能会监控一组以 JSON 格式描述目标(作为目标组列表)的文件。
一个单独的目标组由一个与一组标签相关联的地址列表组成。这些标签会附加到从这些目标检索到的所有时间序列上。一个从我们的 etcd 服务发现中提取的目标组示例可能如下所示:
{
"targets": ["10.0.33.1:54423", "10.0.34.12:32535"],
"labels": {
"job": "node_exporter"
}
}
程序
我们需要一个小程序,它能连接到 etcd 集群,查询 /services
路径下的所有服务,并将它们写入一个目标组文件。
让我们从一些基础工作开始。我们的工具有两个标志:要连接的 etcd 服务器和写入目标组的文件。在内部,服务被表示为一个从服务名称到实例的映射。实例则是一个从 etcd 路径中的实例标识符到其地址的映射。
const servicesPrefix = "/services"
type (
instances map[string]string
services map[string]instances
)
var (
etcdServer = flag.String("server", "http://127.0.0.1:4001", "etcd server to connect to")
targetFile = flag.String("target-file", "tgroups.json", "the file that contains the target groups")
)
我们的 main
函数会解析标志并初始化保存当前服务的对象。然后,我们连接到 etcd 服务器,并对 /services
路径进行递归读取。我们收到给定路径的子树作为结果,并调用 srvs.handle
,它会为子树中的每个节点递归执行 srvs.update
方法。update
方法会修改我们的 srvs
对象的状态,使其与 etcd 中子树的状态保持一致。最后,我们调用 srvs.persist
,它将 srvs
对象转换为目标组列表,并将其写入由 -target-file
标志指定的文件中。
func main() {
flag.Parse()
var (
client = etcd.NewClient([]string{*etcdServer})
srvs = services{}
)
// Retrieve the subtree of the /services path.
res, err := client.Get(servicesPrefix, false, true)
if err != nil {
log.Fatalf("Error on initial retrieval: %s", err)
}
srvs.handle(res.Node, srvs.update)
srvs.persist()
}
假设我们已经有了一个可行的实现。我们现在可以每 30 秒运行一次这个工具,从而获得一个对我们服务发现中当前目标的大致准确的视图。
但我们能做得更好吗?
答案是肯定的。etcd 提供了 watch 机制,让我们能够监听任何路径及其子路径的更新。这样,我们就能立即获知变化并立即应用它们。我们也不必一次又一次地遍历整个 /services
子树,这对于大量的服务和实例来说可能变得非常重要。
我们对 main
函数进行如下扩展:
func main() {
// ...
updates := make(chan *etcd.Response)
// Start recursively watching for updates.
go func() {
_, err := client.Watch(servicesPrefix, 0, true, updates, nil)
if err != nil {
log.Errorln(err)
}
}()
// Apply updates sent on the channel.
for res := range updates {
log.Infoln(res.Action, res.Node.Key, res.Node.Value)
handler := srvs.update
if res.Action == "delete" {
handler = srvs.delete
}
srvs.handle(res.Node, handler)
srvs.persist()
}
}
我们启动一个 goroutine,递归地监视 /services
中条目的变化。它会永久阻塞,并将所有变化发送到 updates
通道。然后,我们从通道中读取更新,并像之前一样应用它。然而,如果一个实例或整个服务消失了,我们会调用 srvs.handle
,但使用的是 srvs.delete
方法。
每次更新后,我们都再次调用 srvs.persist
,将变更写入 Prometheus 正在监视的文件中。
修改方法
到目前为止,从概念上讲这是可行的。剩下的就是 update
和 delete
处理方法以及 persist
方法。
update
和 delete
由 handle
方法调用,该方法只是在路径有效的情况下,对子树中的每个节点调用它们。
var pathPat = regexp.MustCompile(`/services/([^/]+)(?:/(\d+))?`)
func (srvs services) handle(node *etcd.Node, handler func(*etcd.Node)) {
if pathPat.MatchString(node.Key) {
handler(node)
} else {
log.Warnf("unhandled key %q", node.Key)
}
if node.Dir {
for _, n := range node.Nodes {
srvs.handle(n, handler)
}
}
}
update
update 方法根据 etcd 中更新的节点来改变我们 services
对象的状态。
func (srvs services) update(node *etcd.Node) {
match := pathPat.FindStringSubmatch(node.Key)
// Creating a new job directory does not require any action.
if match[2] == "" {
return
}
srv := match[1]
instanceID := match[2]
// We received an update for an instance.
insts, ok := srvs[srv]
if !ok {
insts = instances{}
srvs[srv] = insts
}
insts[instanceID] = node.Value
}
delete
delete 方法根据从 etcd 中删除的节点,从我们的 services
对象中移除实例或整个作业。
func (srvs services) delete(node *etcd.Node) {
match := pathPat.FindStringSubmatch(node.Key)
srv := match[1]
instanceID := match[2]
// Deletion of an entire service.
if instanceID == "" {
delete(srvs, srv)
return
}
// Delete a single instance from the service.
delete(srvs[srv], instanceID)
}
persist
persist 方法将我们的 services
对象的状态转换为一个 TargetGroup
列表。然后,它将这个列表以 JSON 格式写入 -target-file
。
type TargetGroup struct {
Targets []string `json:"targets,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
}
func (srvs services) persist() {
var tgroups []*TargetGroup
// Write files for current services.
for job, instances := range srvs {
var targets []string
for _, addr := range instances {
targets = append(targets, addr)
}
tgroups = append(tgroups, &TargetGroup{
Targets: targets,
Labels: map[string]string{"job": job},
})
}
content, err := json.Marshal(tgroups)
if err != nil {
log.Errorln(err)
return
}
f, err := create(*targetFile)
if err != nil {
log.Errorln(err)
return
}
defer f.Close()
if _, err := f.Write(content); err != nil {
log.Errorln(err)
}
}
上线运行
一切就绪,那我们该如何运行它呢?
我们只需启动我们的工具,并配置好输出文件:
./etcd_sd -target-file /etc/prometheus/tgroups.json
然后,我们使用相同的文件为 Prometheus 配置基于文件的服务发现。最简单的配置如下所示:
scrape_configs:
- job_name: 'default' # Will be overwritten by job label of target groups.
file_sd_configs:
- names: ['/etc/prometheus/tgroups.json']
就这样。现在,我们的 Prometheus 会随着服务及其实例在我们 etcd 服务发现中的加入和离开而保持同步。
结论
如果 Prometheus 没有为您的组织使用的服务发现提供原生支持,请不要灰心。通过一个小工具程序,您可以轻松地弥补这一差距,并从对监控目标的无缝更新中获益。这样,您就可以将监控配置的变更从部署流程中移除。
非常感谢我们的贡献者 Jimmy Dyson 和 Robert Jacob 添加了对 Kubernetes 和 Marathon 的原生支持。也请查看 Keegan C Smith 基于文件的 EC2 服务发现 实现。
您可以在 GitHub 上找到这篇博文的完整源代码。