import math
import time
import pandas as pd
import random
import ipaddress
import os
from location_ipfire_db_reader import LocationDatabase
from location_ipfire_db_reader.exceptions import IPAddressError
import requests
source_file_path = os.getenv('SOURCE_FILE_PATH', 'D:\\Dev\\GeoLocationData\\GeoDB_merger\\source\\dbip-country-lite-2024-01.csv')
log_file_path_ipfire = os.getenv('LOG_FILE_PATH_IPFIRE', 'D:\\Dev\\GeoLocationData\\GeoDB_merger\\log\\random_pickIPaddr_checkwith_IPFireDB.log')
log_file_path_IP_API = os.getenv('LOG_file_path_IP_API', 'D:\\Dev\\GeoLocationData\\GeoDB_merger\\log\\random_pickIPaddr_checkwith_IP-API.log')
requests_per_minute = int(os.getenv('REQUESTS_PER_MINUTE', '15'))
def is_ipv4(address):
try:
ipaddress.IPv4Address(address)
return True
except ipaddress.AddressValueError:
return False
def calculate_sample_size(total, confidence_level, margin_error, p=0.5):
if not isinstance(total, int) or total <= 0:
raise ValueError("Total must be a positive integer")
if confidence_level not in {90, 95, 99}:
raise ValueError("Confidence level must be 90, 95, or 99.")
if not (isinstance(margin_error, float) or isinstance(margin_error, int)) or margin_error <= 0:
raise ValueError("Margin error must be a positive number.")
if not (isinstance(p, float) or isinstance(p, int)) or not (0 <= p <= 1):
raise ValueError("Expected population proportion 'p' must be a number between 0 and 1.")
z_dict = {90: 1.645, 95: 1.96, 99: 2.576}
z = z_dict.get(confidence_level)
if z is None:
raise ValueError(f"The z-value corresponding to {confidence_level} was not found.")
sample_size = ((z**2) * p * (1-p)) / (margin_error**2)
sample_size = sample_size / (1 + ((sample_size - 1) / total))
# Return the sample size, rounded up to the nearest integer
return math.ceil(sample_size)
#return int(sample_size) if sample_size == int(sample_size) else int(sample_size) + 1
def calculate_accuracy(log_file_path_ipfire):
match_count = 0
total_count = 0
with open(log_file_path_ipfire, 'r') as file:
for line in file:
total_count += 1
if ', O' in line:
match_count += 1
accuracy = (match_count / total_count) * 100 if total_count > 0 else 0
return accuracy
def ReadSourceCSVfile(csv_file_path):
data = []
with open(csv_file_path, 'r') as file:
for line in file:
start_ip, end_ip, country = line.strip().split(',')
if is_ipv4(start_ip) and is_ipv4(end_ip):
row = {
'start_ip': start_ip,
'end_ip': end_ip,
'country': country,
'start_ip_int': int(ipaddress.IPv4Address(start_ip)),
'end_ip_int': int(ipaddress.IPv4Address(end_ip))
}
data.append(row)
df_ipv4 = pd.DataFrame(data)
return df_ipv4
def generate_random_ips(df, sample_size, batch_size=100):
all_ips = []
for _ in range(sample_size):
random_row = df.sample().iloc[0]
start_ip_int = random_row['start_ip_int']
end_ip_int = random_row['end_ip_int']
random_ip_int = random.randint(start_ip_int, end_ip_int)
all_ips.append(str(ipaddress.IPv4Address(random_ip_int)))
return all_ips
def check_country_with_ipfire_db(df, ips, log_file_path_ipfire, db):
match_count = 0
processed = 0
with open(log_file_path_ipfire, 'w') as log_file:
for ip in ips:
processed += 1
ip_int = int(ipaddress.IPv4Address(ip))
matching_rows = df[(df['start_ip_int'] <= ip_int) & (df['end_ip_int'] >= ip_int)]
expected_country_code = matching_rows.iloc[0]['country'] if not matching_rows.empty else 'N/A'
try:
actual_country_code = db.find_country(ip)
except IPAddressError:
actual_country_code = 'N/A'
match = 'O' if expected_country_code == actual_country_code else 'X'
if match == 'O':
match_count += 1
result_line = f"{ip}, {expected_country_code}, {actual_country_code}, {match}"
log_file.write(result_line + '\n')
log_file.flush()
print(f"Processed {processed}/{len(ips)} IPs")
return match_count, processed
def check_country_with_ip_api(df, ips, log_file_path, batch_size=100):
match_count = 0
processed = 0
with open(log_file_path, 'w') as log_file:
for batch_start in range(0, len(ips), batch_size):
batch_ips = ips[batch_start:batch_start + batch_size]
response = requests.post('http://ip-api.com/batch?fields=status,countryCode,query', json=batch_ips)
if response.status_code != 200:
print(f"Request failed with status code: {response.status_code}")
continue
rate_limit_remaining = response.headers.get('X-Rl')
rate_limit_reset = response.headers.get('X-Ttl')
if rate_limit_remaining is not None:
rate_limit_remaining = int(rate_limit_remaining)
else:
print("Warning: 'X-Rl' header is missing. Defaulting to 0.")
rate_limit_remaining = 0
if rate_limit_reset is not None:
rate_limit_reset = int(rate_limit_reset)
else:
print("Warning: 'X-Ttl' header is missing. Defaulting to 60.")
rate_limit_reset = 60
if rate_limit_remaining == 0:
time.sleep(rate_limit_reset)
else:
# Calculate the delay needed to not exceed 15 requests per minute
delay = 60 / requests_per_minute
time.sleep(delay)
try:
batch_results = response.json()
except ValueError as e:
print(f"Error parsing JSON response: {e}")
continue
for ip, res in zip(batch_ips, batch_results):
processed += 1
if res['status'] == 'success':
ip_int = int(ipaddress.IPv4Address(ip))
matching_rows = df[(df['start_ip_int'] <= ip_int) & (df['end_ip_int'] >= ip_int)]
if not matching_rows.empty:
expected_country_code = matching_rows.iloc[0]['country']
else:
expected_country_code = 'N/A'
actual_country_code = res.get('countryCode', 'N/A')
match = 'O' if expected_country_code == actual_country_code else 'X'
if match == 'O':
match_count += 1
else:
expected_country_code = 'N/A'
actual_country_code = 'N/A'
match = 'X'
result_line = f"{ip}, {expected_country_code}, {actual_country_code}, {match}"
log_file.write(result_line + '\n')
log_file.flush()
print(f"Processed {processed}/{len(ips)} IPs")
return match_count, processed
def main():
# Read the CSV file and calculate the sample size
df_ipv4 = ReadSourceCSVfile(source_file_path)
total_ipv4_rows = len(df_ipv4)
print(f"Total number of IPv4 records in the data source: {total_ipv4_rows}")
confidence_level = 95
margin_error = 0.05
sample_size = calculate_sample_size(total_ipv4_rows, confidence_level, margin_error)
print(f"Sampling size: {sample_size}")
random_ips = generate_random_ips(df_ipv4, sample_size)
# Initialize LocationDatabase instance
db = LocationDatabase('location.db')
# Check countries with IPFire DB
check_country_with_ipfire_db(df_ipv4, random_ips, log_file_path_ipfire, db)
accuracy_ipfire = calculate_accuracy(log_file_path_ipfire)
print(f"Accuracy (Cross-checked w/IPFire location.db): {accuracy_ipfire}%")
# Check countries with IP-API
check_country_with_ip_api(df_ipv4, random_ips, log_file_path_IP_API)
accuracy_ip_api = calculate_accuracy(log_file_path_IP_API)
print(f"Accuracy (Cross-checked w/IP-API.com HTTP-API): {accuracy_ip_api}%")
if __name__ == "__main__":
main()