for t in gauge_buffer: t.labels(instance='{}:{}'.format('ip', 'port')).set(data[index_buffer[gauge_buffer.index(t)]]) # generate_latest更新列表 res_list.append(prometheus_client.generate_latest(t))
跑一下代码 发现可行… but 当第二次访问的时候 View 函数会重新调用 Gauge() 方法 初始化 第二次访问接口就会报错 google 了好久 没有合适的轮子 只好自己想了个蠢的办法 (ps:肯定有更好的方法 我一直在寻找) 在 flask 初始化服务的时候 先初始化 Gauge 同时定义一个列表 来记录 Gauge 实例的 index 然后在 view 函数被调用的时候更新数据 再根据 index 索引到 Gauge 最后 使用 prometheus_client.generate_latest() 返回 response
1 2
ele = Gauge('{}'.format(ele), '', ['instance', ]) gauge_buffer.append(ele)
import prometheus_client from prometheus_client import Gauge from flask import Response, Flask
test = {r'monitor': 1} app = Flask(__name__) gauges = test # Gauge列表 gauge_buffer = [] # 位置列表 index_buffer = [] for ele in gauges: # 记录Gauge 列表位置 index_buffer.append(ele) # 循环生成Gauge实例,添加到列表 ele = Gauge('{}'.format(ele), '', ['instance', ]) gauge_buffer.append(ele)
@app.route('/metrics') defmetrics(): # 每次访问调用 test data = test res_list = [] for t in gauge_buffer: t.labels(instance='{}:{}'.format('ip', 'port')).set(data[index_buffer[gauge_buffer.index(t)]]) # generate_latest更新列表 res_list.append(prometheus_client.generate_latest(t)) return Response(res_list, mimetype='text/plain') pass
if __name__ == '__main__': app.debug = True app.run(host='localhost', port=10110) pass