| import math |
| from typing import Union |
|
|
| from PIL import Image |
| from transformers.feature_extraction_utils import BatchFeature |
| from transformers.image_utils import ImageInput, make_nested_list_of_images |
| from transformers.image_transforms import to_pil_image |
| from transformers.processing_utils import ( |
| ImagesKwargs, |
| ProcessingKwargs, |
| ProcessorMixin, |
| Unpack, |
| ) |
| from transformers.tokenization_utils_base import BatchEncoding, TextInput |
| from transformers.utils import logging |
|
|
| logger = logging.get_logger(__name__) |
|
|
|
|
| |
| |
| def round_by_factor(number: float, factor: int) -> int: |
| """Returns the closest integer to 'number' that is divisible by 'factor'.""" |
| return round(number / factor) * factor |
|
|
|
|
| def ceil_by_factor(number: float, factor: int) -> int: |
| """Returns the smallest integer greater than or equal to 'number' that is divisible by 'factor'.""" |
| return math.ceil(number / factor) * factor |
|
|
|
|
| def floor_by_factor(number: float, factor: int) -> int: |
| """Returns the largest integer less than or equal to 'number' that is divisible by 'factor'.""" |
| return math.floor(number / factor) * factor |
|
|
|
|
| def find_closest_aspect_ratio( |
| aspect_ratio: float, |
| target_ratios: list[tuple[int, int]], |
| width: int, |
| height: int, |
| image_size: int, |
| ) -> tuple[int, int]: |
| """Find the closest aspect ratio from target_ratios to match the input aspect ratio. |
| |
| Args: |
| aspect_ratio: The aspect ratio to match (width/height). |
| target_ratios: List of possible aspect ratios as tuples of (width, height) integers. |
| width: Original image width in pixels. |
| height: Original image height in pixels. |
| image_size: Base size for calculating target area. |
| |
| Returns: |
| tuple[int, int]: The best matching ratio as (width, height) integers. |
| """ |
| best_ratio_diff = float("inf") |
| best_ratio = (1, 1) |
| area = width * height |
|
|
| for ratio in target_ratios: |
| target_aspect_ratio = ratio[0] / ratio[1] |
| ratio_diff = abs(aspect_ratio - target_aspect_ratio) |
|
|
| |
| if ratio_diff < best_ratio_diff: |
| best_ratio_diff = ratio_diff |
| best_ratio = ratio |
| |
| elif ratio_diff == best_ratio_diff: |
| target_area = image_size * image_size * ratio[0] * ratio[1] |
| if area > 0.5 * target_area: |
| best_ratio = ratio |
|
|
| return best_ratio |
|
|
|
|
| class Lfm2VlImagesKwargs(ImagesKwargs, total=False): |
| return_row_col_info: bool | None |
| max_image_size: dict[str, int] | None |
|
|
|
|
| class Lfm2VlProcessorKwargs(ProcessingKwargs, total=False): |
| images_kwargs: Lfm2VlImagesKwargs |
|
|
| _defaults = { |
| "text_kwargs": { |
| "add_special_tokens": False, |
| "padding": False, |
| "is_split_into_words": False, |
| }, |
| "images_kwargs": { |
| "do_resize": False, |
| }, |
| } |
|
|
|
|
| class Lfm2VlProcessor(ProcessorMixin): |
| r""" |
| Constructs a Lfm2Vl processor which wraps a Lfm2Tokenizer tokenizer and Lfm2Vl image processor into a single processor. |
| |
| [`Lfm2VlProcessor`] offers all the functionalities of [`Siglip2ImageProcessor`] and [`Lfm2Tokenizer`]. |
| |
| Args: |
| image_processor (`Siglip2ImageProcessor`): |
| An instance of [`Siglip2ImageProcessor`]. The image processor is a required input. |
| tokenizer (`PreTrainedTokenizerBase`): |
| An instance of [`PreTrainedTokenizerBase`]. This should correspond with the model's text model. The tokenizer is a required input. |
| """ |
|
|
| attributes = ["image_processor", "tokenizer"] |
| image_processor_class = "Siglip2ImageProcessor" |
| tokenizer_class = "AutoTokenizer" |
|
|
| def __init__( |
| self, |
| image_processor, |
| tokenizer, |
| chat_template: str, |
| use_image_special_tokens: bool, |
| downsample_factor: int, |
| do_image_splitting: bool, |
| min_tiles: int, |
| max_tiles: int, |
| use_thumbnail: bool, |
| min_image_tokens: int, |
| max_image_tokens: int, |
| encoder_patch_size: int, |
| tile_size: int, |
| max_pixels_tolerance: float, |
| max_num_patches: int, |
| auto_map: dict[str, str] = None, |
| **kwargs, |
| ): |
| self.image_token = getattr(tokenizer, "image_token", "<image>") |
| self.image_token_id = tokenizer.convert_tokens_to_ids(self.image_token) |
| self.use_image_special_tokens = use_image_special_tokens |
| self.image_start_token = getattr( |
| tokenizer, "image_start_token", "<|image_start|>" |
| ) |
| self.image_end_token = getattr(tokenizer, "image_end_token", "<|image_end|>") |
| self.image_thumbnail_token = getattr( |
| tokenizer, "image_thumbnail", "<|img_thumbnail|>" |
| ) |
| self.downsample_factor = downsample_factor |
| self.do_image_splitting = do_image_splitting |
| self.min_tiles = min_tiles |
| self.max_tiles = max_tiles |
| self.use_thumbnail = use_thumbnail |
| self.min_image_tokens = min_image_tokens |
| self.max_image_tokens = max_image_tokens |
| self.encoder_patch_size = encoder_patch_size |
| self.tile_size = tile_size |
| self.max_pixels_tolerance = max_pixels_tolerance |
| self.chat_template = chat_template |
| self.auto_map = auto_map |
| super().__init__( |
| image_processor, tokenizer, chat_template=chat_template, **kwargs |
| ) |
| self.max_num_patches = max_num_patches |
| self.image_processor.max_num_patches = max_num_patches |
|
|
| def _high_res_preprocessor( |
| self, |
| image: Image.Image, |
| min_tiles, |
| max_tiles, |
| tile_size, |
| ) -> tuple[list[Image.Image], int, int, int]: |
| """Process a high resolution image into patches. |
| This method splits a high resolution image into a grid of smaller patches while trying to maintain |
| the original aspect ratio. It finds the optimal grid configuration within the specified tile constraints. |
| """ |
| orig_width, orig_height = image.size |
| aspect_ratio = orig_width / orig_height |
|
|
| |
| target_ratios = [ |
| (w, h) |
| for n in range(min_tiles, max_tiles + 1) |
| for w in range(1, n + 1) |
| for h in range(1, n + 1) |
| if min_tiles <= w * h <= max_tiles |
| ] |
| target_ratios = sorted(set(target_ratios), key=lambda x: x[0] * x[1]) |
|
|
| |
| if not target_ratios: |
| return [], 0, 0 |
|
|
| |
| grid_width, grid_height = find_closest_aspect_ratio( |
| aspect_ratio, target_ratios, orig_width, orig_height, tile_size |
| ) |
|
|
| target_width = tile_size * grid_width |
| target_height = tile_size * grid_height |
| total_patches = grid_width * grid_height |
|
|
| |
| resized_img = image.resize((target_width, target_height)) |
| patches = [] |
|
|
| for i in range(total_patches): |
| |
| col = i % grid_width |
| row = i // grid_width |
| box = ( |
| col * tile_size, |
| row * tile_size, |
| (col + 1) * tile_size, |
| (row + 1) * tile_size, |
| ) |
| patch = resized_img.crop(box) |
| patches.append(patch) |
|
|
| num_rows = grid_height |
| num_columns = grid_width |
|
|
| return patches, num_rows, num_columns |
|
|
| def _smart_resize( |
| self, |
| image: Image.Image, |
| downsample_factor: int, |
| min_image_tokens: int, |
| max_image_tokens: int, |
| encoder_patch_size: int, |
| ) -> Image.Image: |
| """ |
| Rescales the image so that the following conditions are met: |
| 1. Both dimensions (height and width) are divisible by 'encoder_patch_size' * 'downsample_factor'. |
| This ensures no padding is needed in the downsampling step. |
| 2. The total number of pixels is within the range ['smart_resize_min_pixels', 'smart_resize_max_pixels']. |
| 3. The aspect ratio of the image is maintained as closely as possible. |
| """ |
| width, height = image.size |
|
|
| total_factor = encoder_patch_size * downsample_factor |
| smart_resize_min_pixels = ( |
| min_image_tokens |
| * encoder_patch_size ** 2 |
| * downsample_factor ** 2 |
| ) |
| smart_resize_max_pixels = ( |
| max_image_tokens |
| * encoder_patch_size ** 2 |
| * downsample_factor ** 2 |
| ) |
|
|
| h_bar = max(total_factor, round_by_factor(height, total_factor)) |
| w_bar = max(total_factor, round_by_factor(width, total_factor)) |
|
|
| if h_bar * w_bar > smart_resize_max_pixels: |
| beta = math.sqrt((height * width) / smart_resize_max_pixels) |
| h_bar = max(total_factor, floor_by_factor(height / beta, total_factor)) |
| w_bar = max(total_factor, floor_by_factor(width / beta, total_factor)) |
| elif h_bar * w_bar < smart_resize_min_pixels: |
| beta = math.sqrt(smart_resize_min_pixels / (height * width)) |
| h_bar = ceil_by_factor(height * beta, total_factor) |
| w_bar = ceil_by_factor(width * beta, total_factor) |
|
|
| resized_img = image.resize((w_bar, h_bar)) |
| return resized_img |
|
|
| def _get_tokens_num(self, image_height: int, image_width: int) -> int: |
| num_patches_height = image_height // self.encoder_patch_size |
| num_patches_width = image_width // self.encoder_patch_size |
|
|
| dwn_num_patches_height = math.ceil(num_patches_height / self.downsample_factor) |
| dwn_num_patches_width = math.ceil(num_patches_width / self.downsample_factor) |
|
|
| return dwn_num_patches_height * dwn_num_patches_width |
|
|
| def _is_img_too_large( |
| self, |
| image: Image.Image, |
| max_image_tokens: int, |
| encoder_patch_size: int, |
| max_pixels_tolerance: float, |
| ) -> bool: |
| """Check if the image is too large to be processed as one tile.""" |
| width, height = image.size |
|
|
| h_bar = max(encoder_patch_size, round_by_factor(height, encoder_patch_size)) |
| w_bar = max(encoder_patch_size, round_by_factor(width, encoder_patch_size)) |
| return ( |
| h_bar * w_bar |
| > max_image_tokens |
| * encoder_patch_size ** 2 |
| * self.downsample_factor ** 2 |
| * max_pixels_tolerance |
| ) |
|
|
| def _resize_and_maybe_split( |
| self, |
| image: ImageInput, |
| downsample_factor: int, |
| min_tiles: int, |
| max_tiles: int, |
| use_thumbnail: bool, |
| min_image_tokens: int, |
| max_image_tokens: int, |
| encoder_patch_size: int, |
| tile_size: int, |
| max_pixels_tolerance: float, |
| ) -> tuple[list[Image.Image], int, int, int, int]: |
| """Apply smart resize and maybe split the image into tiles if image too large. |
| Return: |
| image_tiles: ImageInput |
| num_tokens_per_tile: int |
| num_rows: int |
| num_cols: int |
| num_thumbnail_tokens: int |
| """ |
| image = to_pil_image(image) |
| do_image_splitting = not min_tiles == max_tiles == 1 |
| if ( |
| self._is_img_too_large( |
| image, |
| max_image_tokens, |
| encoder_patch_size, |
| max_pixels_tolerance, |
| ) |
| and do_image_splitting |
| ): |
| image_tiles, num_rows, num_cols = self._high_res_preprocessor( |
| image, min_tiles, max_tiles, tile_size |
| ) |
| if len(image_tiles) > 1: |
| num_thumbnail_tokens = 0 |
| if use_thumbnail: |
| thumbnail_image = self._smart_resize( |
| image, |
| downsample_factor, |
| min_image_tokens, |
| max_image_tokens, |
| encoder_patch_size, |
| ) |
| num_thumbnail_tokens = self._get_tokens_num( |
| thumbnail_image.height, thumbnail_image.width |
| ) |
| image_tiles.append(thumbnail_image) |
|
|
| return ( |
| image_tiles, |
| self._get_tokens_num(tile_size, tile_size), |
| num_rows, |
| num_cols, |
| num_thumbnail_tokens, |
| ) |
| else: |
| image = self._smart_resize( |
| image, |
| downsample_factor, |
| min_image_tokens, |
| max_image_tokens, |
| encoder_patch_size, |
| ) |
| return [image], self._get_tokens_num(image.height, image.width), 1, 1, 0 |
|
|
| def process_vision( |
| self, |
| text: list[str], |
| images: list[list[ImageInput]], |
| use_image_special_tokens: bool, |
| downsample_factor: int, |
| min_tiles: int, |
| max_tiles: int, |
| use_thumbnail: bool, |
| min_image_tokens: int, |
| max_image_tokens: int, |
| encoder_patch_size: int, |
| tile_size: int, |
| max_pixels_tolerance: float, |
| output_kwargs: dict, |
| ): |
| if text is not None: |
| n_images_in_text = [sample.count(self.image_token) for sample in text] |
|
|
| n_images_in_images = [len(sublist) for sublist in images] |
|
|
| if n_images_in_images != n_images_in_text: |
| raise ValueError( |
| f"The number of images in the text {n_images_in_text} and images {n_images_in_images} should be the same." |
| ) |
|
|
| prompt_strings = [] |
| image_inputs = [] |
|
|
| for sample_text, sample_images in zip(text, images, strict=False): |
| split_sample = sample_text.split(self.image_token) |
| sample_tiles = [] |
| sample_text_with_image_tokens = "" |
| for i, image in enumerate(sample_images): |
| sample_text_with_image_tokens += split_sample[i] |
| if use_image_special_tokens: |
| sample_text_with_image_tokens += self.image_start_token |
| ( |
| image_tiles, |
| num_tokens_per_tile, |
| num_rows, |
| num_cols, |
| num_thumbnail_tokens, |
| ) = self._resize_and_maybe_split( |
| image, |
| downsample_factor, |
| min_tiles, |
| max_tiles, |
| use_thumbnail, |
| min_image_tokens, |
| max_image_tokens, |
| encoder_patch_size, |
| tile_size, |
| max_pixels_tolerance, |
| ) |
|
|
| if len(image_tiles) > 1: |
| for row in range(num_rows): |
| for col in range(num_cols): |
| if use_image_special_tokens: |
| sample_text_with_image_tokens += ( |
| f"<|img_row_{row + 1}_col_{col + 1}|>" |
| ) |
| sample_text_with_image_tokens += ( |
| self.image_token * num_tokens_per_tile |
| ) |
|
|
| if num_thumbnail_tokens > 0: |
| if use_image_special_tokens: |
| sample_text_with_image_tokens += self.image_thumbnail_token |
| sample_text_with_image_tokens += ( |
| self.image_token * num_thumbnail_tokens |
| ) |
| else: |
| sample_text_with_image_tokens += ( |
| self.image_token * num_tokens_per_tile |
| ) |
|
|
| if use_image_special_tokens: |
| sample_text_with_image_tokens += self.image_end_token |
|
|
| sample_text_with_image_tokens += split_sample[i + 1] |
| sample_tiles.extend(image_tiles) |
|
|
| prompt_strings.append(sample_text_with_image_tokens) |
| image_inputs.append(sample_tiles) |
|
|
| image_inputs = self.image_processor( |
| image_inputs, **output_kwargs["images_kwargs"] |
| ) |
|
|
| if text is None: |
| return None, image_inputs |
|
|
| return prompt_strings, image_inputs |
|
|
| def __call__( |
| self, |
| images: ImageInput | list[ImageInput] | list[list[ImageInput]] = None, |
| text: Union[TextInput, "PreTokenizedInput", list[TextInput], list["PreTokenizedInput"]] = None, |
| use_image_special_tokens: bool | None = None, |
| downsample_factor: int | None = None, |
| min_image_tokens: int | None = None, |
| max_image_tokens: int | None = None, |
| do_image_splitting: bool | None = None, |
| min_tiles: int | None = None, |
| max_tiles: int | None = None, |
| use_thumbnail: bool | None = None, |
| encoder_patch_size: int | None = None, |
| tile_size: int | None = None, |
| max_pixels_tolerance: float | None = None, |
| **kwargs: Unpack[Lfm2VlProcessorKwargs], |
| ) -> BatchEncoding: |
| """ |
| Processes the input prompts and returns a BatchFeature. |
| |
| Example: |
| |
| ```python |
| >>> import requests |
| >>> from transformers import AutoProcessor |
| >>> from transformers.image_utils import load_image |
| >>> processor = AutoProcessor.from_pretrained("LiquidAI/LFM2-VL-1.6B", trust_remote_code=True) |
| |
| >>> url1 = "https://cdn.britannica.com/61/93061-050-99147DCE/Statue-of-Liberty-Island-New-York-Bay.jpg" |
| >>> url2 = "https://cdn.britannica.com/59/94459-050-DBA42467/Skyline-Chicago.jpg" |
| |
| >>> image1, image2 = load_image(url1), load_image(url2) |
| >>> images = [image1, image2] |
| |
| >>> conversation = [ |
| ... { |
| ... "role": "user", |
| ... "content": [ |
| ... {"type": "image", "url": image1}, |
| ... {"type": "image", "url": image2}, |
| ... {"type": "text", "text": "Compare the two images."}, |
| ... ], |
| ... }, |
| ... ] |
| >>> chat_inputs = processor.apply_chat_template(conversation, add_generation_prompt=True, tokenize=False) |
| >>> outputs = processor(images=images, text=chat_inputs, return_tensors="pt") |
| >>> input_ids = outputs.input_ids |
| >>> input_tokens = processor.tokenizer.batch_decode(input_ids) |
| >>> print(input_tokens) |
| '['user\nCompare the two images.\nassistant\n']' |
| ``` |
| |
| Args: |
| images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `list[PIL.Image.Image]`, `list[np.ndarray]`, `list[torch.Tensor]`, *optional*): |
| The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch |
| tensor. If is of type `list[ImageInput]`, it's assumed that this is for a single prompt i.e. of batch size 1. |
| text (`TextInput`, *optional*): |
| The sequence or batch of sequences to be encoded. |
| Wherever an image token, `<image>` is encountered it is expanded to a proper sequence of image tokens. |
| return_tensors (`str | TensorType`, *optional*): |
| If set, will return tensors of a particular framework. See [`PreTrainedTokenizerFast.__call__`] for more |
| information. |
| """ |
| use_image_special_tokens = ( |
| use_image_special_tokens |
| if use_image_special_tokens is not None |
| else self.use_image_special_tokens |
| ) |
| downsample_factor = ( |
| downsample_factor |
| if downsample_factor is not None |
| else self.downsample_factor |
| ) |
| do_image_splitting = ( |
| do_image_splitting |
| if do_image_splitting is not None |
| else self.do_image_splitting |
| ) |
|
|
| min_tiles = min_tiles if min_tiles is not None else self.min_tiles |
| max_tiles = max_tiles if max_tiles is not None else self.max_tiles |
|
|
| if not do_image_splitting: |
| min_tiles = 1 |
| max_tiles = 1 |
| logger.debug( |
| "Image splitting is disabled, setting min_tiles and max_tiles to 1. Set do_image_splitting=True to enable splitting." |
| ) |
|
|
| if do_image_splitting and min_tiles > max_tiles: |
| raise ValueError("min_tiles must be less than or equal to max_tiles") |
|
|
| use_thumbnail = ( |
| use_thumbnail if use_thumbnail is not None else self.use_thumbnail |
| ) |
| min_image_tokens = ( |
| min_image_tokens if min_image_tokens is not None else self.min_image_tokens |
| ) |
| max_image_tokens = ( |
| max_image_tokens if max_image_tokens is not None else self.max_image_tokens |
| ) |
| encoder_patch_size = ( |
| encoder_patch_size |
| if encoder_patch_size is not None |
| else self.encoder_patch_size |
| ) |
| tile_size = tile_size if tile_size is not None else self.tile_size |
| max_pixels_tolerance = ( |
| max_pixels_tolerance |
| if max_pixels_tolerance is not None |
| else self.max_pixels_tolerance |
| ) |
|
|
| if text is None and images is None: |
| raise ValueError("You must provide one of `text` or `images`.") |
|
|
| output_kwargs = self._merge_kwargs( |
| Lfm2VlProcessorKwargs, |
| tokenizer_init_kwargs=self.tokenizer.init_kwargs, |
| **kwargs, |
| ) |
|
|
| if text is not None: |
| if isinstance(text, str): |
| text = [text] |
| elif not isinstance(text, list) and not isinstance(text[0], str): |
| raise ValueError( |
| "Invalid input text. Please provide a string, or a list of strings" |
| ) |
| n_images_in_text = sum([sample.count(self.image_token) for sample in text]) |
| if n_images_in_text > 0 and (images is None): |
| raise ValueError( |
| f"We detected {n_images_in_text} tokens in the text but no images were passed" |
| ) |
|
|
| inputs = {} |
|
|
| if images is not None: |
| images = make_nested_list_of_images(images) |
| text, vision_inputs = self.process_vision( |
| text, |
| images, |
| use_image_special_tokens, |
| downsample_factor, |
| min_tiles, |
| max_tiles, |
| use_thumbnail, |
| min_image_tokens, |
| max_image_tokens, |
| encoder_patch_size, |
| tile_size, |
| max_pixels_tolerance, |
| output_kwargs, |
| ) |
| inputs.update(vision_inputs) |
|
|
| return_tensors = output_kwargs["text_kwargs"].pop("return_tensors", None) |
|
|
| if text is not None: |
| text_inputs = self.tokenizer(text, **output_kwargs["text_kwargs"]) |
| self._check_special_mm_tokens(text, text_inputs, modalities=["image"]) |
| inputs.update(text_inputs) |
|
|
| return BatchFeature(inputs, tensor_type=return_tensors) |
|
|
| def batch_decode(self, *args, **kwargs): |
| """ |
| This method forwards all its arguments to LFM2Tokeniser's [`~PreTrainedTokenizer.batch_decode`]. Please |
| refer to the docstring of this method for more information. |
| """ |
| batched_decode_output = self.tokenizer.batch_decode(*args, **kwargs) |
| return batched_decode_output |
|
|
| def decode(self, *args, **kwargs): |
| """ |
| This method forwards all its arguments to LFM2Tokeniser's [`~PreTrainedTokenizer.decode`]. Please refer to |
| the docstring of this method for more information. |
| """ |
| decode_output = self.tokenizer.decode(*args, **kwargs) |
| return decode_output |
|
|
| @property |
| def model_input_names(self): |
| tokenizer_input_names = self.tokenizer.model_input_names |
| image_processor_input_names = self.image_processor.model_input_names |
| return list(dict.fromkeys(image_processor_input_names + tokenizer_input_names)) |
|
|
|
|
| __all__ = ["Lfm2VlProcessor"] |
|
|