请参与 Prometheus 用户调研(2026 年 3 月版) ,帮助社区确定未来开发工作的优先级!

使用 etcd 实现自定义服务发现

2015年8月17日作者 Fabian Reinartz

上一篇文章中,我们介绍了 Prometheus 中多种实现服务发现的新方法。从那时起,发生了很多变化。我们改进了内部实现,并收到了来自社区的巨大贡献,增加了对 Kubernetes 和 Marathon 服务发现的支持。它们将在 0.16 版本发布时提供。

我们还探讨了自定义服务发现这一主题。

并非每种服务发现类型都具备足够的通用性,能够直接包含在 Prometheus 中。很有可能你的组织有自己的一套专用系统,你只需要让它与 Prometheus 协同工作即可。这并不意味着你无法享受自动发现新监控目标所带来的便利。

在这篇文章中,我们将实现一个小工具,将基于 etcd (一种高一致性的分布式键值存储)的自定义服务发现方法连接到 Prometheus。

etcd 与 Prometheus 中的目标

我们虚构的服务发现系统将服务及其其实例存储在一个定义良好的键结构下

/services/<service_name>/<instance_id> = <instance_address>

Prometheus 现在应该可以自动添加和删除所有现有服务的目标,随着它们的上线和下线。我们可以利用 Prometheus 基于文件的服务发现功能,该功能通过监控一组描述目标(以 JSON 格式的目标组列表)的文件来工作。

单个目标组由一组与标签相关联的地址列表组成。这些标签会附加到从这些目标获取的所有时间序列上。一个从我们的 etcd 服务发现中提取的目标组示例如下:

{
  "targets": ["10.0.33.1:54423", "10.0.34.12:32535"],
  "labels": {
    "job": "node_exporter"
  }
}

程序实现

我们需要一个小程序来连接到 etcd 集群,查询 /services 路径下找到的所有服务,并将它们写入到目标组文件中。

让我们从基础工作开始。我们的工具有两个标志:连接的 etcd 服务器以及写入目标组的文件。在内部,服务被表示为从服务名称到实例的映射。实例则是从 etcd 路径中的实例标识符到其地址的映射。

const servicesPrefix = "/services"

type (
  instances map[string]string
  services  map[string]instances
)

var (
  etcdServer = flag.String("server", "http://127.0.0.1:4001", "etcd server to connect to")
  targetFile = flag.String("target-file", "tgroups.json", "the file that contains the target groups")
)

我们的 main 函数解析标志并初始化保存当前服务的对象。然后我们连接到 etcd 服务器,并对 /services 路径进行递归读取。我们接收给定路径的子树作为结果,并调用 srvs.handle,它会为子树中的每个节点递归执行 srvs.update 方法。update 方法会修改 srvs 对象的状态,使其与 etcd 中子树的状态保持一致。最后,我们调用 srvs.persist,它将 srvs 对象转换为目标组列表,并将其写入由 -target-file 标志指定的文件中。

func main() {
  flag.Parse()

  var (
    client  = etcd.NewClient([]string{*etcdServer})
    srvs    = services{}
  )

  // Retrieve the subtree of the /services path.
  res, err := client.Get(servicesPrefix, false, true)
  if err != nil {
    log.Fatalf("Error on initial retrieval: %s", err)
  }
  srvs.handle(res.Node, srvs.update)
  srvs.persist()
}

假设我们已经完成了这个实现,我们现在可以每 30 秒运行一次此工具,从而获得服务发现中当前目标的较为准确的视图。

但我们可以做得更好吗?

答案是肯定的。etcd 提供了监听(watches)功能,让我们能够监听任何路径及其子路径的更新。有了它,我们就能立即获知更改并即时应用。我们也不必一遍又一遍地处理整个 /services 子树,这对于拥有大量服务和实例的场景来说非常重要。

我们将 main 函数扩展如下:

func main() {
  // ...

  updates := make(chan *etcd.Response)

  // Start recursively watching for updates.
  go func() {
    _, err := client.Watch(servicesPrefix, 0, true, updates, nil)
    if err != nil {
      log.Errorln(err)
    }
  }()

  // Apply updates sent on the channel.
  for res := range updates {
    log.Infoln(res.Action, res.Node.Key, res.Node.Value)

    handler := srvs.update
    if res.Action == "delete" {
      handler = srvs.delete
    }
    srvs.handle(res.Node, handler)
    srvs.persist()
  }
}

我们启动一个 goroutine,递归监听 /services 中条目的变更。它会永久阻塞并将所有变更发送到 updates 通道。然后我们从通道读取更新并像之前一样应用它们。但是,如果实例或整个服务消失,我们会使用 srvs.delete 方法调用 srvs.handle

我们在每次更新结束时再次调用 srvs.persist,将更改写入 Prometheus 正在监控的文件中。

修改方法

目前为止一切顺利——在概念上这是可行的。剩下的就是 updatedelete 处理方法,以及 persist 方法。

updatedeletehandle 方法调用,该方法只是在路径有效的情况下为子树中的每个节点调用它们。

var pathPat = regexp.MustCompile(`/services/([^/]+)(?:/(\d+))?`)

func (srvs services) handle(node *etcd.Node, handler func(*etcd.Node)) {
  if pathPat.MatchString(node.Key) {
    handler(node)
  } else {
    log.Warnf("unhandled key %q", node.Key)
  }

  if node.Dir {
    for _, n := range node.Nodes {
      srvs.handle(n, handler)
    }
  }
}

update

更新方法根据在 etcd 中更新的节点来修改我们的 services 对象状态。

func (srvs services) update(node *etcd.Node) {
  match := pathPat.FindStringSubmatch(node.Key)
  // Creating a new job directory does not require any action.
  if match[2] == "" {
    return
  }
  srv := match[1]
  instanceID := match[2]

  // We received an update for an instance.
  insts, ok := srvs[srv]
  if !ok {
    insts = instances{}
    srvs[srv] = insts
  }
  insts[instanceID] = node.Value
}

delete

删除方法根据从 etcd 中删除的节点,从我们的 services 对象中移除实例或整个作业。

func (srvs services) delete(node *etcd.Node) {
  match := pathPat.FindStringSubmatch(node.Key)
  srv := match[1]
  instanceID := match[2]

  // Deletion of an entire service.
  if instanceID == "" {
    delete(srvs, srv)
    return
  }

  // Delete a single instance from the service.
  delete(srvs[srv], instanceID)
}

persist

持久化方法将我们的 services 对象状态转换为 TargetGroup 列表,然后以 JSON 格式将其写入 -target-file 中。

type TargetGroup struct {
  Targets []string          `json:"targets,omitempty"`
  Labels  map[string]string `json:"labels,omitempty"`
}

func (srvs services) persist() {
  var tgroups []*TargetGroup
  // Write files for current services.
  for job, instances := range srvs {
    var targets []string
    for _, addr := range instances {
      targets = append(targets, addr)
    }

    tgroups = append(tgroups, &TargetGroup{
      Targets: targets,
      Labels:  map[string]string{"job": job},
    })
  }

  content, err := json.Marshal(tgroups)
  if err != nil {
    log.Errorln(err)
    return
  }

  f, err := create(*targetFile)
  if err != nil {
    log.Errorln(err)
    return
  }
  defer f.Close()

  if _, err := f.Write(content); err != nil {
    log.Errorln(err)
  }
}

上线运行

全部完成,那么我们该如何运行它呢?

我们只需使用配置好的输出文件启动我们的工具即可:

./etcd_sd -target-file /etc/prometheus/tgroups.json

然后,我们配置 Prometheus 使用基于文件的服务发现,并指定同一个文件。最简单的配置如下:

scrape_configs:
- job_name: 'default' # Will be overwritten by job label of target groups.
  file_sd_configs:
  - names: ['/etc/prometheus/tgroups.json']

就是这样。现在,我们的 Prometheus 可以与在 etcd 服务发现中加入或离开的服务及其其实例保持同步。

结论

如果 Prometheus 没有原生支持您组织的服务发现方式,请不要灰心。使用一个小巧的实用程序,您可以轻松弥补这一差距,并从受监控目标的无缝更新中获益。因此,您可以将监控配置的变更从部署工作中移除。

非常感谢我们的贡献者 Jimmy Dyson Robert Jacob  为添加 Kubernetes Marathon  的原生支持。也请查看 Keegan C Smith  基于文件的 EC2 服务发现  方案。

您可以在 GitHub 上找到本篇博客的完整源代码