Data is available at
Use <https://github.com/elasticsearch-dump/elasticsearch-dump> to import into Elastic.
tally-costs.py
原始文件
#!/usr/bin/env python3
import argparse
import csv
import locale
import logging
from datetime import datetime
from typing import Union
import elastic_transport
import numpy as np
from elasticsearch.helpers import scan
from elasticsearch_dsl import Search, Q
from pydantic import BaseModel
from tqdm import tqdm
from lib.config import ELASTIC_HOST, ELASTIC_INDEX, ELASTIC_API_KEY
from lib.elastic.client import ElasticClient
logging.basicConfig()
logger = logging.getLogger('MAIN')
logger.setLevel(logging.INFO)
_IS_SERVICE = False
_IS_TEST = False
_WRITE_CSVS = False
class Usage(BaseModel):
type: str
cost: Union[float, None] = None
tokens: int
API_COSTS_PER_1M = {
# Input, output.
'turbo': np.mean([1.50, 2.00]),
'gpt4-turbo': np.mean([10.00, 30.00]),
'gpt4': np.mean([30.00, 60.00]),
'gpt4-32k': np.mean([60.00, 120.00]),
'gpt4o': np.mean([2.5, 10.00]),
'o1-mini': np.mean([3.00, 12.00]),
'o1': np.mean([15, 60]),
'claude': np.mean([8, 24]),
'claude-sonnet': np.mean([3, 15]),
'claude-opus': np.mean([15, 75]),
'gemini': np.mean([0.50, 1.50]),
'gemini-pro': np.mean([1.25, 5.00]),
'gemini-flash': np.mean([0.075, 0.30]),
'mistral-tiny': np.mean([2.5 / 4, 7.5 / 4]),
'mistral-small': np.mean([2.5 / 3, 7.5 / 3]),
'mistral-medium': np.mean([2.5 / 2, 7.5 / 2]),
'mistral-large': np.mean([2.5, 7.5]),
'aws-claude': np.mean([8, 24]),
'aws-claude-sonnet': np.mean([3, 15]),
'aws-claude-opus': np.mean([15, 75]),
'aws-mistral-small': np.mean([0.001 * 1000, 0.003 * 1000]),
'aws-mistral-large': np.mean([0.004 * 1000, 0.012 * 1000]),
'azure-gpt4-32k': np.mean([60.00, 120.00]),
'azure-gpt4': np.mean([30.00, 60.00]),
'azure-gpt4-turbo': np.mean([10.00, 30.00]),
'azure-gpt4o': np.mean([5, 15]),
'azure-turbo': np.mean([1.5, 2]),
# These are ignored.
# Not doing image models because the API returns the tokens, not the number of images generated.
'dall-e': 0,
'azure-dall-e': 0,
'palm-bison': 0,
'bison': 0,
}
def dynamic_print(msg: str):
if _IS_SERVICE:
logger.info(msg)
else:
tqdm.write(msg)
def get_unique_urls(index_name="proxy_stats"):
"""
Retrieves all unique URLs from the specified Elasticsearch index.
:param index_name: Name of the index to search (default: "proxy_stats")
:return: A list of unique URLs
"""
s = Search(using=ElasticClient().client, index=index_name)
s.aggs.bucket('unique_urls', 'terms', field='url.keyword', size=10000)
s = s.extra(size=0)
response = s.execute()
unique_urls = [bucket.key for bucket in response.aggregations.unique_urls.buckets if bucket.key != 'proxy.chub-archive.evulid.cc']
return unique_urls
def process_docs_by_url(index_name="proxy_stats", batch_size=1000):
"""
Processes documents for each unique URL in the index, sorted by timestamp.
:param index_name: Name of the index to search (default: "proxy_stats")
:param batch_size: Number of documents to fetch in each batch (default: 1000)
"""
es_client = ElasticClient().client
unique_urls = get_unique_urls(index_name)
grand_total = 0
main_pbar = tqdm(unique_urls, position=0, disable=_IS_SERVICE)
for url in main_pbar:
s = Search(using=es_client, index=index_name)
s = s.query(Q('term', url__keyword=url)).sort({"timestamp": {"order": "asc"}})
doc_count = s.count()
if _IS_SERVICE:
logger.info(f'Processing {doc_count} documents for {url}')
csv_rows = []
total_api_costs = {}
total_cost = 0
previous_doc = None
docs = scan(es_client, query=s.to_dict(), index=index_name, size=batch_size, preserve_order=True)
pbar = tqdm(docs, total=doc_count, desc=url, disable=_IS_SERVICE, leave=True, position=1)
i = 0
for current_doc in pbar:
assert current_doc['_source']['url'] == url
if _IS_TEST and i == 1400:
previous_doc = None
break
i += 1
current_timestamp = current_doc['_source']['timestamp']
current_timestamp_d = datetime.fromtimestamp(current_timestamp)
if previous_doc:
previous_timestamp = previous_doc['_source']['timestamp']
previous_timestamp_d = datetime.fromtimestamp(previous_timestamp)
if current_timestamp_d < previous_timestamp_d:
raise Exception(f'ASSERT FAILED: {current_timestamp_d} < {previous_timestamp_d}')
current_uptime = current_doc['_source']['uptime']
previous_uptime = previous_doc['_source']['uptime']
if previous_uptime > current_uptime:
# Process and save the previous document's stats (which were before the proxy restarted).
new_costs = determine_costs(previous_doc['_source'])
costs = combine_model_stats(total_api_costs, new_costs)
total_cost += calc_total_cost(costs)
timestamp = previous_timestamp_d.strftime('%m-%d-%Y %H:%M')
csv_rows.append([new_costs, timestamp, previous_uptime])
previous_doc = current_doc
pbar.set_postfix({
'Timestamp': current_timestamp_d.strftime('%m-%d-%Y %H:%M'),
'Total Cost': f'{locale.currency(total_cost, grouping=True)}'
})
main_pbar.set_postfix({
'Grand Total': f'{locale.currency(grand_total, grouping=True)}'
})
# Process the last document
if previous_doc:
new_costs = determine_costs(previous_doc['_source'])
costs = combine_model_stats(total_api_costs, new_costs)
total_cost += calc_total_cost(costs)
timestamp = datetime.fromtimestamp(previous_doc['_source']['timestamp']).strftime('%m-%d-%Y %H:%M')
csv_rows.append([new_costs, timestamp, previous_doc['_source']['uptime']])
grand_total += total_cost
if _WRITE_CSVS and len(csv_rows):
write_to_csv(csv_rows, filename=f'{url.replace(".", "-")}.csv')
if _IS_SERVICE:
logger.info(f'Total cost for {url}: {locale.currency(total_cost, grouping=True)}')
return grand_total
def determine_costs(doc):
costs = {}
for k, v in doc.items():
if isinstance(v, dict) and v.get('usage'):
usage = Usage(type=k, cost=v['usage']['cost'] if v['usage']['cost'] > 0 else None, tokens=v['usage']['tokens'])
if k not in costs:
costs[k] = []
costs[k].append(usage)
return calculate_costs(costs)
def calculate_costs(data: dict):
results = {}
for k, v in data.items():
if k not in results:
results[k] = {
'cost': 0,
'tokens': 0,
}
for x in v:
if x.cost:
results[k]['cost'] = results[k]['cost'] + x.cost
results[k]['tokens'] = results[k]['tokens'] + x.tokens
for api_type, v in results.items():
if v['cost'] == 0:
v['cost'] = (v['tokens'] / 1_000_000) * API_COSTS_PER_1M[api_type]
return results
def calc_total_cost(data: dict):
total = 0
for k, v in data.items():
total += v['cost']
return np.round(total, 2)
def combine_model_stats(dict1: dict, dict2: dict):
result = {}
all_models = set(dict1.keys()) | set(dict2.keys())
for model in all_models:
result[model] = {
'cost': (dict1.get(model, {}).get('cost', 0) +
dict2.get(model, {}).get('cost', 0)),
'tokens': (dict1.get(model, {}).get('tokens', 0) +
dict2.get(model, {}).get('tokens', 0))
}
return result
def write_to_csv(data: list, filename: str):
all_models = set()
for row in data:
all_models.update(row[0].keys())
header = ['timestamp', 'uptime_seconds'] + [f'{model}_{metric}' for model in sorted(all_models) for metric in ['cost', 'tokens']]
with open(filename, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(header)
for row in data:
model_data = row[0]
timestamp = row[1]
if isinstance(timestamp, (int, float)):
timestamp = datetime.fromtimestamp(timestamp).strftime('%m-%d-%Y %H:%M')
csv_row = [timestamp]
# Add uptime (if present, otherwise use 0)
csv_row.append(row[2] if len(row) > 2 else 0)
for model in sorted(all_models):
if model in model_data:
csv_row.extend([model_data[model].get('cost', 0), model_data[model].get('tokens', 0)])
else:
csv_row.extend([0, 0]) # Add zeros for missing models
writer.writerow(csv_row)
dynamic_print(f"Data has been written to {filename}")
def main(args):
try:
ElasticClient.initialise(ELASTIC_HOST, ELASTIC_INDEX, ELASTIC_API_KEY)
except elastic_transport.ConnectionError as e:
logger.critical(f'Failed to connect to Elastic: {e}')
quit(1)
locale.setlocale(locale.LC_ALL, '')
logger.info('Fetching URLs from Elastic...')
unique_urls = get_unique_urls()
logger.info(f'Found {len(unique_urls)} unique URLs.')
grand_total = process_docs_by_url()
dynamic_print(f'Total wasted....... {locale.currency(grand_total, grouping=True)}')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--service', action='store_true', help='Run as a service.')
parser.add_argument('-t', '--test', action='store_true')
parser.add_argument('--write-csv', action='store_true', help='Write the CSVs.')
args = parser.parse_args()
_IS_SERVICE = args.service
_IS_TEST = args.test
_WRITE_CSVS = args.write_csv
main(args)
1 | #!/usr/bin/env python3 |
2 | import argparse |
3 | import csv |
4 | import locale |
5 | import logging |
6 | from datetime import datetime |
7 | from typing import Union |
8 | |
9 | import elastic_transport |
10 | import numpy as np |
11 | from elasticsearch.helpers import scan |
12 | from elasticsearch_dsl import Search, Q |
13 | from pydantic import BaseModel |
14 | from tqdm import tqdm |
15 | |
16 | from lib.config import ELASTIC_HOST, ELASTIC_INDEX, ELASTIC_API_KEY |
17 | from lib.elastic.client import ElasticClient |
18 | |
19 | logging.basicConfig() |
20 | logger = logging.getLogger('MAIN') |
21 | logger.setLevel(logging.INFO) |
22 | |
23 | _IS_SERVICE = False |
24 | _IS_TEST = False |
25 | _WRITE_CSVS = False |
26 | |
27 | |
28 | class Usage(BaseModel): |
29 | type: str |
30 | cost: Union[float, None] = None |
31 | tokens: int |
32 | |
33 | |
34 | API_COSTS_PER_1M = { |
35 | # Input, output. |
36 | 'turbo': np.mean([1.50, 2.00]), |
37 | 'gpt4-turbo': np.mean([10.00, 30.00]), |
38 | 'gpt4': np.mean([30.00, 60.00]), |
39 | 'gpt4-32k': np.mean([60.00, 120.00]), |
40 | 'gpt4o': np.mean([2.5, 10.00]), |
41 | 'o1-mini': np.mean([3.00, 12.00]), |
42 | 'o1': np.mean([15, 60]), |
43 | 'claude': np.mean([8, 24]), |
44 | 'claude-sonnet': np.mean([3, 15]), |
45 | 'claude-opus': np.mean([15, 75]), |
46 | 'gemini': np.mean([0.50, 1.50]), |
47 | 'gemini-pro': np.mean([1.25, 5.00]), |
48 | 'gemini-flash': np.mean([0.075, 0.30]), |
49 | 'mistral-tiny': np.mean([2.5 / 4, 7.5 / 4]), |
50 | 'mistral-small': np.mean([2.5 / 3, 7.5 / 3]), |
51 | 'mistral-medium': np.mean([2.5 / 2, 7.5 / 2]), |
52 | 'mistral-large': np.mean([2.5, 7.5]), |
53 | 'aws-claude': np.mean([8, 24]), |
54 | 'aws-claude-sonnet': np.mean([3, 15]), |
55 | 'aws-claude-opus': np.mean([15, 75]), |
56 | 'aws-mistral-small': np.mean([0.001 * 1000, 0.003 * 1000]), |
57 | 'aws-mistral-large': np.mean([0.004 * 1000, 0.012 * 1000]), |
58 | 'azure-gpt4-32k': np.mean([60.00, 120.00]), |
59 | 'azure-gpt4': np.mean([30.00, 60.00]), |
60 | 'azure-gpt4-turbo': np.mean([10.00, 30.00]), |
61 | 'azure-gpt4o': np.mean([5, 15]), |
62 | 'azure-turbo': np.mean([1.5, 2]), |
63 | |
64 | # These are ignored. |
65 | # Not doing image models because the API returns the tokens, not the number of images generated. |
66 | 'dall-e': 0, |
67 | 'azure-dall-e': 0, |
68 | 'palm-bison': 0, |
69 | 'bison': 0, |
70 | } |
71 | |
72 | |
73 | def dynamic_print(msg: str): |
74 | if _IS_SERVICE: |
75 | logger.info(msg) |
76 | else: |
77 | tqdm.write(msg) |
78 | |
79 | |
80 | def get_unique_urls(index_name="proxy_stats"): |
81 | """ |
82 | Retrieves all unique URLs from the specified Elasticsearch index. |
83 | |
84 | :param index_name: Name of the index to search (default: "proxy_stats") |
85 | :return: A list of unique URLs |
86 | """ |
87 | s = Search(using=ElasticClient().client, index=index_name) |
88 | s.aggs.bucket('unique_urls', 'terms', field='url.keyword', size=10000) |
89 | s = s.extra(size=0) |
90 | response = s.execute() |
91 | unique_urls = [bucket.key for bucket in response.aggregations.unique_urls.buckets if bucket.key != 'proxy.chub-archive.evulid.cc'] |
92 | return unique_urls |
93 | |
94 | |
95 | def process_docs_by_url(index_name="proxy_stats", batch_size=1000): |
96 | """ |
97 | Processes documents for each unique URL in the index, sorted by timestamp. |
98 | |
99 | :param index_name: Name of the index to search (default: "proxy_stats") |
100 | :param batch_size: Number of documents to fetch in each batch (default: 1000) |
101 | """ |
102 | es_client = ElasticClient().client |
103 | unique_urls = get_unique_urls(index_name) |
104 | grand_total = 0 |
105 | |
106 | main_pbar = tqdm(unique_urls, position=0, disable=_IS_SERVICE) |
107 | |
108 | for url in main_pbar: |
109 | s = Search(using=es_client, index=index_name) |
110 | s = s.query(Q('term', url__keyword=url)).sort({"timestamp": {"order": "asc"}}) |
111 | doc_count = s.count() |
112 | |
113 | if _IS_SERVICE: |
114 | logger.info(f'Processing {doc_count} documents for {url}') |
115 | |
116 | csv_rows = [] |
117 | total_api_costs = {} |
118 | total_cost = 0 |
119 | previous_doc = None |
120 | |
121 | docs = scan(es_client, query=s.to_dict(), index=index_name, size=batch_size, preserve_order=True) |
122 | pbar = tqdm(docs, total=doc_count, desc=url, disable=_IS_SERVICE, leave=True, position=1) |
123 | |
124 | i = 0 |
125 | for current_doc in pbar: |
126 | assert current_doc['_source']['url'] == url |
127 | if _IS_TEST and i == 1400: |
128 | previous_doc = None |
129 | break |
130 | i += 1 |
131 | |
132 | current_timestamp = current_doc['_source']['timestamp'] |
133 | current_timestamp_d = datetime.fromtimestamp(current_timestamp) |
134 | |
135 | if previous_doc: |
136 | previous_timestamp = previous_doc['_source']['timestamp'] |
137 | previous_timestamp_d = datetime.fromtimestamp(previous_timestamp) |
138 | if current_timestamp_d < previous_timestamp_d: |
139 | raise Exception(f'ASSERT FAILED: {current_timestamp_d} < {previous_timestamp_d}') |
140 | current_uptime = current_doc['_source']['uptime'] |
141 | previous_uptime = previous_doc['_source']['uptime'] |
142 | if previous_uptime > current_uptime: |
143 | # Process and save the previous document's stats (which were before the proxy restarted). |
144 | new_costs = determine_costs(previous_doc['_source']) |
145 | costs = combine_model_stats(total_api_costs, new_costs) |
146 | total_cost += calc_total_cost(costs) |
147 | timestamp = previous_timestamp_d.strftime('%m-%d-%Y %H:%M') |
148 | csv_rows.append([new_costs, timestamp, previous_uptime]) |
149 | |
150 | previous_doc = current_doc |
151 | pbar.set_postfix({ |
152 | 'Timestamp': current_timestamp_d.strftime('%m-%d-%Y %H:%M'), |
153 | 'Total Cost': f'{locale.currency(total_cost, grouping=True)}' |
154 | }) |
155 | main_pbar.set_postfix({ |
156 | 'Grand Total': f'{locale.currency(grand_total, grouping=True)}' |
157 | }) |
158 | |
159 | # Process the last document |
160 | if previous_doc: |
161 | new_costs = determine_costs(previous_doc['_source']) |
162 | costs = combine_model_stats(total_api_costs, new_costs) |
163 | total_cost += calc_total_cost(costs) |
164 | timestamp = datetime.fromtimestamp(previous_doc['_source']['timestamp']).strftime('%m-%d-%Y %H:%M') |
165 | csv_rows.append([new_costs, timestamp, previous_doc['_source']['uptime']]) |
166 | |
167 | grand_total += total_cost |
168 | if _WRITE_CSVS and len(csv_rows): |
169 | write_to_csv(csv_rows, filename=f'{url.replace(".", "-")}.csv') |
170 | if _IS_SERVICE: |
171 | logger.info(f'Total cost for {url}: {locale.currency(total_cost, grouping=True)}') |
172 | |
173 | return grand_total |
174 | |
175 | |
176 | def determine_costs(doc): |
177 | costs = {} |
178 | for k, v in doc.items(): |
179 | if isinstance(v, dict) and v.get('usage'): |
180 | usage = Usage(type=k, cost=v['usage']['cost'] if v['usage']['cost'] > 0 else None, tokens=v['usage']['tokens']) |
181 | if k not in costs: |
182 | costs[k] = [] |
183 | costs[k].append(usage) |
184 | return calculate_costs(costs) |
185 | |
186 | |
187 | def calculate_costs(data: dict): |
188 | results = {} |
189 | for k, v in data.items(): |
190 | if k not in results: |
191 | results[k] = { |
192 | 'cost': 0, |
193 | 'tokens': 0, |
194 | } |
195 | for x in v: |
196 | if x.cost: |
197 | results[k]['cost'] = results[k]['cost'] + x.cost |
198 | results[k]['tokens'] = results[k]['tokens'] + x.tokens |
199 | |
200 | for api_type, v in results.items(): |
201 | if v['cost'] == 0: |
202 | v['cost'] = (v['tokens'] / 1_000_000) * API_COSTS_PER_1M[api_type] |
203 | |
204 | return results |
205 | |
206 | |
207 | def calc_total_cost(data: dict): |
208 | total = 0 |
209 | for k, v in data.items(): |
210 | total += v['cost'] |
211 | return np.round(total, 2) |
212 | |
213 | |
214 | def combine_model_stats(dict1: dict, dict2: dict): |
215 | result = {} |
216 | all_models = set(dict1.keys()) | set(dict2.keys()) |
217 | for model in all_models: |
218 | result[model] = { |
219 | 'cost': (dict1.get(model, {}).get('cost', 0) + |
220 | dict2.get(model, {}).get('cost', 0)), |
221 | 'tokens': (dict1.get(model, {}).get('tokens', 0) + |
222 | dict2.get(model, {}).get('tokens', 0)) |
223 | } |
224 | return result |
225 | |
226 | |
227 | def write_to_csv(data: list, filename: str): |
228 | all_models = set() |
229 | for row in data: |
230 | all_models.update(row[0].keys()) |
231 | |
232 | header = ['timestamp', 'uptime_seconds'] + [f'{model}_{metric}' for model in sorted(all_models) for metric in ['cost', 'tokens']] |
233 | |
234 | with open(filename, 'w', newline='') as csvfile: |
235 | writer = csv.writer(csvfile) |
236 | writer.writerow(header) |
237 | |
238 | for row in data: |
239 | model_data = row[0] |
240 | timestamp = row[1] |
241 | |
242 | if isinstance(timestamp, (int, float)): |
243 | timestamp = datetime.fromtimestamp(timestamp).strftime('%m-%d-%Y %H:%M') |
244 | |
245 | csv_row = [timestamp] |
246 | |
247 | # Add uptime (if present, otherwise use 0) |
248 | csv_row.append(row[2] if len(row) > 2 else 0) |
249 | |
250 | for model in sorted(all_models): |
251 | if model in model_data: |
252 | csv_row.extend([model_data[model].get('cost', 0), model_data[model].get('tokens', 0)]) |
253 | else: |
254 | csv_row.extend([0, 0]) # Add zeros for missing models |
255 | |
256 | writer.writerow(csv_row) |
257 | |
258 | dynamic_print(f"Data has been written to {filename}") |
259 | |
260 | |
261 | def main(args): |
262 | try: |
263 | ElasticClient.initialise(ELASTIC_HOST, ELASTIC_INDEX, ELASTIC_API_KEY) |
264 | except elastic_transport.ConnectionError as e: |
265 | logger.critical(f'Failed to connect to Elastic: {e}') |
266 | quit(1) |
267 | |
268 | locale.setlocale(locale.LC_ALL, '') |
269 | |
270 | logger.info('Fetching URLs from Elastic...') |
271 | unique_urls = get_unique_urls() |
272 | logger.info(f'Found {len(unique_urls)} unique URLs.') |
273 | grand_total = process_docs_by_url() |
274 | dynamic_print(f'Total wasted....... {locale.currency(grand_total, grouping=True)}') |
275 | |
276 | |
277 | if __name__ == '__main__': |
278 | parser = argparse.ArgumentParser() |
279 | parser.add_argument('-s', '--service', action='store_true', help='Run as a service.') |
280 | parser.add_argument('-t', '--test', action='store_true') |
281 | parser.add_argument('--write-csv', action='store_true', help='Write the CSVs.') |
282 | args = parser.parse_args() |
283 | _IS_SERVICE = args.service |
284 | _IS_TEST = args.test |
285 | _WRITE_CSVS = args.write_csv |
286 | main(args) |