|
|
|
|
| import os
|
| from PIL import Image
|
| import numpy as np
|
| import json
|
|
|
| Image.MAX_IMAGE_PIXELS = None
|
|
|
| from concurrent.futures import ThreadPoolExecutor
|
| from tqdm import tqdm
|
|
|
| max_pixels=2048*2048
|
|
|
| max_long_size=4096
|
| def has_alpha(img:Image.Image):
|
| for band in img.getbands():
|
| if band in {'A','a','P'}:
|
| return True
|
| return False
|
|
|
| def add_white_background(img:Image.Image)->Image.Image:
|
| img=img.convert('RGBA')
|
| background = Image.new('RGBA', img.size, (255, 255, 255))
|
| img = Image.alpha_composite(background, img)
|
| return img
|
|
|
| def resize_image(image:Image.Image)->Image.Image:
|
|
|
| width, height = image.size
|
| max_side = max(width, height)
|
| current_pixels=width*height
|
|
|
|
|
| if max_side > max_long_size or current_pixels>max_pixels:
|
|
|
|
|
| scale = min((max_long_size / max_side),
|
| ((max_pixels / current_pixels) ** 0.5))
|
|
|
| new_width = int(width * scale)
|
| new_height = int(height * scale)
|
|
|
| resized_image = image.resize((new_width, new_height),
|
| Image.BICUBIC
|
| )
|
| return resized_image
|
|
|
| return image
|
|
|
| def load_image(image_path:str)->Image.Image:
|
| try:
|
| with Image.open(image_path) as img:
|
| img.load()
|
| np.array(img)
|
| img=resize_image(img)
|
| if has_alpha(img):
|
| img=add_white_background(img)
|
| if not img.mode == "RGB":
|
| img = img.convert("RGB")
|
| return img
|
| except:
|
| return None
|
|
|
| def get_image_metainfo(img):
|
| if img is None:
|
| return None
|
| else:
|
| width, height = img.size
|
| return {'width':width,
|
| 'height':height,
|
| 'pixel_num':width*height,
|
|
|
| }
|
|
|
|
|
| def process_image(input_image_path:str,output_image_path:str):
|
|
|
| img=load_image(input_image_path)
|
|
|
| image_metainfo=get_image_metainfo(img)
|
|
|
| output_image_json_path=output_image_path.replace(".webp",".json")
|
|
|
|
|
| if img is not None and image_metainfo is not None:
|
| img.save(output_image_path,"WEBP",quality=90)
|
| with open(output_image_json_path,'w') as f:
|
| json.dump(image_metainfo,f,indent=4)
|
|
|
| def get_image_paths(input_dir, output_dir):
|
| for root, _, files in os.walk(input_dir):
|
| for file in files:
|
| if file.lower().endswith(('.png', '.jpg', '.jpeg', '.webp', '.bmp')):
|
| input_path = os.path.join(root, file)
|
| rel_path = os.path.relpath(input_path,
|
| input_dir)
|
| output_path = os.path.join(output_dir,
|
| os.path.splitext(rel_path)[0] + '.webp')
|
| os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
| yield input_path, output_path
|
|
|
| def process_images_with_thread_pool(input_image_dir:str,
|
| output_image_dir:str,
|
| num_threads=16):
|
| os.makedirs(output_image_dir, exist_ok=True)
|
| image_paths = get_image_paths(input_image_dir, output_image_dir)
|
| with ThreadPoolExecutor(max_workers=num_threads) as executor:
|
|
|
| futures = []
|
| for input_path, output_path in image_paths:
|
| futures.append(executor.submit(process_image,
|
| input_path,
|
| output_path))
|
| for _ in tqdm(
|
| executor.map(lambda f: f.result(), futures),
|
| total=len(futures),
|
| desc="Processing images"):
|
| pass
|
|
|
| if __name__ == "__main__":
|
|
|
|
|
|
|
| process_images_with_thread_pool(input_image_dir=r"20240808\unsplash-research-dataset-lite-latest\test",
|
| output_image_dir=r"20240808\unsplash-research-dataset-lite-latest\output",
|
| num_threads=16) |