Последняя активность 1729656337

README.md Исходник
Data is available at Use <https://github.com/elasticsearch-dump/elasticsearch-dump> to import into Elastic.
tally-costs.py Исходник
1#!/usr/bin/env python3
2import argparse
3import csv
4import locale
5import logging
6from datetime import datetime
7from typing import Union
8
9import elastic_transport
10import numpy as np
11from elasticsearch.helpers import scan
12from elasticsearch_dsl import Search, Q
13from pydantic import BaseModel
14from tqdm import tqdm
15
16from lib.config import ELASTIC_HOST, ELASTIC_INDEX, ELASTIC_API_KEY
17from lib.elastic.client import ElasticClient
18
19logging.basicConfig()
20logger = logging.getLogger('MAIN')
21logger.setLevel(logging.INFO)
22
23_IS_SERVICE = False
24_IS_TEST = False
25_WRITE_CSVS = False
26
27
28class Usage(BaseModel):
29 type: str
30 cost: Union[float, None] = None
31 tokens: int
32
33
34API_COSTS_PER_1M = {
35 # Input, output.
36 'turbo': np.mean([1.50, 2.00]),
37 'gpt4-turbo': np.mean([10.00, 30.00]),
38 'gpt4': np.mean([30.00, 60.00]),
39 'gpt4-32k': np.mean([60.00, 120.00]),
40 'gpt4o': np.mean([2.5, 10.00]),
41 'o1-mini': np.mean([3.00, 12.00]),
42 'o1': np.mean([15, 60]),
43 'claude': np.mean([8, 24]),
44 'claude-sonnet': np.mean([3, 15]),
45 'claude-opus': np.mean([15, 75]),
46 'gemini': np.mean([0.50, 1.50]),
47 'gemini-pro': np.mean([1.25, 5.00]),
48 'gemini-flash': np.mean([0.075, 0.30]),
49 'mistral-tiny': np.mean([2.5 / 4, 7.5 / 4]),
50 'mistral-small': np.mean([2.5 / 3, 7.5 / 3]),
51 'mistral-medium': np.mean([2.5 / 2, 7.5 / 2]),
52 'mistral-large': np.mean([2.5, 7.5]),
53 'aws-claude': np.mean([8, 24]),
54 'aws-claude-sonnet': np.mean([3, 15]),
55 'aws-claude-opus': np.mean([15, 75]),
56 'aws-mistral-small': np.mean([0.001 * 1000, 0.003 * 1000]),
57 'aws-mistral-large': np.mean([0.004 * 1000, 0.012 * 1000]),
58 'azure-gpt4-32k': np.mean([60.00, 120.00]),
59 'azure-gpt4': np.mean([30.00, 60.00]),
60 'azure-gpt4-turbo': np.mean([10.00, 30.00]),
61 'azure-gpt4o': np.mean([5, 15]),
62 'azure-turbo': np.mean([1.5, 2]),
63
64 # These are ignored.
65 # Not doing image models because the API returns the tokens, not the number of images generated.
66 'dall-e': 0,
67 'azure-dall-e': 0,
68 'palm-bison': 0,
69 'bison': 0,
70}
71
72
73def dynamic_print(msg: str):
74 if _IS_SERVICE:
75 logger.info(msg)
76 else:
77 tqdm.write(msg)
78
79
80def get_unique_urls(index_name="proxy_stats"):
81 """
82 Retrieves all unique URLs from the specified Elasticsearch index.
83
84 :param index_name: Name of the index to search (default: "proxy_stats")
85 :return: A list of unique URLs
86 """
87 s = Search(using=ElasticClient().client, index=index_name)
88 s.aggs.bucket('unique_urls', 'terms', field='url.keyword', size=10000)
89 s = s.extra(size=0)
90 response = s.execute()
91 unique_urls = [bucket.key for bucket in response.aggregations.unique_urls.buckets if bucket.key != 'proxy.chub-archive.evulid.cc']
92 return unique_urls
93
94
95def process_docs_by_url(index_name="proxy_stats", batch_size=1000):
96 """
97 Processes documents for each unique URL in the index, sorted by timestamp.
98
99 :param index_name: Name of the index to search (default: "proxy_stats")
100 :param batch_size: Number of documents to fetch in each batch (default: 1000)
101 """
102 es_client = ElasticClient().client
103 unique_urls = get_unique_urls(index_name)
104 grand_total = 0
105
106 main_pbar = tqdm(unique_urls, position=0, disable=_IS_SERVICE)
107
108 for url in main_pbar:
109 s = Search(using=es_client, index=index_name)
110 s = s.query(Q('term', url__keyword=url)).sort({"timestamp": {"order": "asc"}})
111 doc_count = s.count()
112
113 if _IS_SERVICE:
114 logger.info(f'Processing {doc_count} documents for {url}')
115
116 csv_rows = []
117 total_api_costs = {}
118 total_cost = 0
119 previous_doc = None
120
121 docs = scan(es_client, query=s.to_dict(), index=index_name, size=batch_size, preserve_order=True)
122 pbar = tqdm(docs, total=doc_count, desc=url, disable=_IS_SERVICE, leave=True, position=1)
123
124 i = 0
125 for current_doc in pbar:
126 assert current_doc['_source']['url'] == url
127 if _IS_TEST and i == 1400:
128 previous_doc = None
129 break
130 i += 1
131
132 current_timestamp = current_doc['_source']['timestamp']
133 current_timestamp_d = datetime.fromtimestamp(current_timestamp)
134
135 if previous_doc:
136 previous_timestamp = previous_doc['_source']['timestamp']
137 previous_timestamp_d = datetime.fromtimestamp(previous_timestamp)
138 if current_timestamp_d < previous_timestamp_d:
139 raise Exception(f'ASSERT FAILED: {current_timestamp_d} < {previous_timestamp_d}')
140 current_uptime = current_doc['_source']['uptime']
141 previous_uptime = previous_doc['_source']['uptime']
142 if previous_uptime > current_uptime:
143 # Process and save the previous document's stats (which were before the proxy restarted).
144 new_costs = determine_costs(previous_doc['_source'])
145 costs = combine_model_stats(total_api_costs, new_costs)
146 total_cost += calc_total_cost(costs)
147 timestamp = previous_timestamp_d.strftime('%m-%d-%Y %H:%M')
148 csv_rows.append([new_costs, timestamp, previous_uptime])
149
150 previous_doc = current_doc
151 pbar.set_postfix({
152 'Timestamp': current_timestamp_d.strftime('%m-%d-%Y %H:%M'),
153 'Total Cost': f'{locale.currency(total_cost, grouping=True)}'
154 })
155 main_pbar.set_postfix({
156 'Grand Total': f'{locale.currency(grand_total, grouping=True)}'
157 })
158
159 # Process the last document
160 if previous_doc:
161 new_costs = determine_costs(previous_doc['_source'])
162 costs = combine_model_stats(total_api_costs, new_costs)
163 total_cost += calc_total_cost(costs)
164 timestamp = datetime.fromtimestamp(previous_doc['_source']['timestamp']).strftime('%m-%d-%Y %H:%M')
165 csv_rows.append([new_costs, timestamp, previous_doc['_source']['uptime']])
166
167 grand_total += total_cost
168 if _WRITE_CSVS and len(csv_rows):
169 write_to_csv(csv_rows, filename=f'{url.replace(".", "-")}.csv')
170 if _IS_SERVICE:
171 logger.info(f'Total cost for {url}: {locale.currency(total_cost, grouping=True)}')
172
173 return grand_total
174
175
176def determine_costs(doc):
177 costs = {}
178 for k, v in doc.items():
179 if isinstance(v, dict) and v.get('usage'):
180 usage = Usage(type=k, cost=v['usage']['cost'] if v['usage']['cost'] > 0 else None, tokens=v['usage']['tokens'])
181 if k not in costs:
182 costs[k] = []
183 costs[k].append(usage)
184 return calculate_costs(costs)
185
186
187def calculate_costs(data: dict):
188 results = {}
189 for k, v in data.items():
190 if k not in results:
191 results[k] = {
192 'cost': 0,
193 'tokens': 0,
194 }
195 for x in v:
196 if x.cost:
197 results[k]['cost'] = results[k]['cost'] + x.cost
198 results[k]['tokens'] = results[k]['tokens'] + x.tokens
199
200 for api_type, v in results.items():
201 if v['cost'] == 0:
202 v['cost'] = (v['tokens'] / 1_000_000) * API_COSTS_PER_1M[api_type]
203
204 return results
205
206
207def calc_total_cost(data: dict):
208 total = 0
209 for k, v in data.items():
210 total += v['cost']
211 return np.round(total, 2)
212
213
214def combine_model_stats(dict1: dict, dict2: dict):
215 result = {}
216 all_models = set(dict1.keys()) | set(dict2.keys())
217 for model in all_models:
218 result[model] = {
219 'cost': (dict1.get(model, {}).get('cost', 0) +
220 dict2.get(model, {}).get('cost', 0)),
221 'tokens': (dict1.get(model, {}).get('tokens', 0) +
222 dict2.get(model, {}).get('tokens', 0))
223 }
224 return result
225
226
227def write_to_csv(data: list, filename: str):
228 all_models = set()
229 for row in data:
230 all_models.update(row[0].keys())
231
232 header = ['timestamp', 'uptime_seconds'] + [f'{model}_{metric}' for model in sorted(all_models) for metric in ['cost', 'tokens']]
233
234 with open(filename, 'w', newline='') as csvfile:
235 writer = csv.writer(csvfile)
236 writer.writerow(header)
237
238 for row in data:
239 model_data = row[0]
240 timestamp = row[1]
241
242 if isinstance(timestamp, (int, float)):
243 timestamp = datetime.fromtimestamp(timestamp).strftime('%m-%d-%Y %H:%M')
244
245 csv_row = [timestamp]
246
247 # Add uptime (if present, otherwise use 0)
248 csv_row.append(row[2] if len(row) > 2 else 0)
249
250 for model in sorted(all_models):
251 if model in model_data:
252 csv_row.extend([model_data[model].get('cost', 0), model_data[model].get('tokens', 0)])
253 else:
254 csv_row.extend([0, 0]) # Add zeros for missing models
255
256 writer.writerow(csv_row)
257
258 dynamic_print(f"Data has been written to {filename}")
259
260
261def main(args):
262 try:
263 ElasticClient.initialise(ELASTIC_HOST, ELASTIC_INDEX, ELASTIC_API_KEY)
264 except elastic_transport.ConnectionError as e:
265 logger.critical(f'Failed to connect to Elastic: {e}')
266 quit(1)
267
268 locale.setlocale(locale.LC_ALL, '')
269
270 logger.info('Fetching URLs from Elastic...')
271 unique_urls = get_unique_urls()
272 logger.info(f'Found {len(unique_urls)} unique URLs.')
273 grand_total = process_docs_by_url()
274 dynamic_print(f'Total wasted....... {locale.currency(grand_total, grouping=True)}')
275
276
277if __name__ == '__main__':
278 parser = argparse.ArgumentParser()
279 parser.add_argument('-s', '--service', action='store_true', help='Run as a service.')
280 parser.add_argument('-t', '--test', action='store_true')
281 parser.add_argument('--write-csv', action='store_true', help='Write the CSVs.')
282 args = parser.parse_args()
283 _IS_SERVICE = args.service
284 _IS_TEST = args.test
285 _WRITE_CSVS = args.write_csv
286 main(args)