Detailed interpretation of the DataX Core TransformerRegistry class

TransformerRegistry class, used to register, load and manage data transformers. The following is an explanation of the function of each part:

  • First, this class maintains a map called registedTransformer to store registered transformer information.
  • In the static code block, some native converter instances are built in and registered to registedTransformer.
  • The loadTransformerFromLocalStorage method is used to load a converter from local storage, optionally loading a specified converter. It iterates through the converter files in the specified directory and attempts to load

Per converter, log an error if loading fails.

  • The loadTransformer method is used to load a single transformer. It loads the configuration based on the path to the converter configuration file, and then loads the corresponding class based on the class name in the configuration. Depending on the type of class (whether it inherits from ComplexTransformer or Transformer), register the transformer instance into registedTransformer.
  • The getTransformer method is used to obtain the converter information of the specified name. It is searched from registedTransformer. If it is not found, it may be read from the disk (TODO: According to the comments, this part May be an unimplemented feature).
  • The registTransformer and registComplexTransformer methods are used to register transformers. They check whether the converter name meets the naming rules, and build the converter information into a TransformerInfo instance and add it to registedTransformer.
  • The checkName method is used to check whether the converter name meets the naming rules and determines whether it needs to start with “dx_” based on the isNative parameter.
  • The buildTransformerInfo method is used to build a TransformerInfo instance, which contains the converter’s class loader, whether it is a native converter, and the actual converter instance.
  • The getAllSuportTransformer method returns a list of names of all supported converters.

The main function of this class is to provide converter registration, loading and management functions so that data converters can be dynamically added and used. It plays a very important role in the data processing process, especially the data extraction and transformation stages.

public class TransformerRegistry {

    private static final Logger LOG = LoggerFactory.getLogger(TransformerRegistry.class);
    private static Map<String, TransformerInfo> registeredTransformer = new HashMap<String, TransformerInfo>();

    static {
        //Add some built-in native converters
        // Converters stored locally and loaded from the server will be loaded lazily
        registerTransformer(new SubstrTransformer());
        registerTransformer(new PadTransformer());
        registerTransformer(new ReplaceTransformer());
        registerTransformer(new FilterTransformer());
        registerTransformer(new GroovyTransformer());
        registerTransformer(new DigestTransformer());
    }

    // Load converters from local storage (all converters are loaded by default)
    public static void loadTransformerFromLocalStorage() {
        loadTransformerFromLocalStorage(null);
    }

    // Load converter from local storage (optionally load specific converter)
    public static void loadTransformerFromLocalStorage(List<String> transformers) {
        String[] paths = new File(CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME).list();
        if (null == paths) {
            return;
        }

        for (final String each : paths) {
            try {
                if (transformers == null || transformers.contains(each)) {
                    loadTransformer(each);
                }
            } catch (Exception e) {
                LOG.error(String.format("Skipping the loading of converter (%s), an exception occurred in loadTransformer (%s)", each, e.getMessage()), e);
            }
        }
    }

    //Load the specified converter
    public static void loadTransformer(String each) {
        String transformerPath = CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME + File.separator + each;
        Configuration transformerConfiguration;
        try {
            transformerConfiguration = loadTransFormerConfig(transformerPath);
        } catch (Exception e) {
            LOG.error(String.format("Skipping converter (%s), error loading transformer.json, path = %s", each, transformerPath), e);
            return;
        }

        String className = transformerConfiguration.getString("class");
        if (StringUtils.isEmpty(className)) {
            LOG.error(String.format("Skip converter (%s), class not configured, path = %s, configuration = %s", each, transformerPath, transformerConfiguration.beautify()));
            return;
        }

        String funName = transformerConfiguration.getString("name");
        if (!each.equals(funName)) {
            LOG.warn(String.format("The name of the converter (%s) does not match the name of the transformer.json configuration [%s], the name of the JSON will be ignored, path = %s, configuration = %s", each, funName, transformerPath, transformerConfiguration.beautify()));
        }
        JarLoader jarLoader = new JarLoader(new String[]{transformerPath});

        try {
            Class<?> transformerClass = jarLoader.loadClass(className);
            Object transformer = transformerClass.newInstance();
            if (ComplexTransformer.class.isAssignableFrom(transformer.getClass())) {
                ((ComplexTransformer) transformer).setTransformerName(each);
                registerComplexTransformer((ComplexTransformer) transformer, jarLoader, false);
            } else if (Transformer.class.isAssignableFrom(transformer.getClass())) {
                ((Transformer) transformer).setTransformerName(each);
                registerTransformer((Transformer) transformer, jarLoader, false);
            } else {
                LOG.error(String.format("Error loading Transformer class (%s), path = %s", className, transformerPath));
            }
        } catch (Exception e) {
            // Error converter skipped
            LOG.error(String.format("Skipping converter (%s), error loading Transformer class, path = %s ", each, transformerPath), e);
        }
    }

    private static Configuration loadTransFormerConfig(String transformerPath) {
        return Configuration.from(new File(transformerPath + File.separator + "transformer.json"));
    }

    public static TransformerInfo getTransformer(String transformerName) {
        TransformerInfo result = registeredTransformer.get(transformerName);

        // If result == null, try reading from disk
        // TODO: This part may be an unimplemented function and needs to be developed.

        return result;
    }

    public static synchronized void registTransformer(Transformer transformer) {
        registerTransformer(transformer, null, true);
    }

    public static synchronized void registTransformer(Transformer transformer, ClassLoader classLoader, boolean isNative) {
        checkName(transformer.getTransformerName(), isNative);

        if (registedTransformer.containsKey(transformer.getTransformerName())) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR, " name=" + transformer.getTransformerName());
        }

        registeredTransformer.put(transformer.getTransformerName(), buildTransformerInfo(new ComplexTransformerProxy(transformer), isNative, classLoader));
    }

    public static synchronized void registComplexTransformer(ComplexTransformer complexTransformer, ClassLoader classLoader, boolean isNative) {
        checkName(complexTransformer.getTransformerName(), isNative);

        if (registedTransformer.containsKey(complexTransformer.getTransformerName())) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR, " name=" + complexTransformer.getTransformerName());
        }

        registeredTransformer.put(complexTransformer.getTransformerName(), buildTransformerInfo(complexTransformer, isNative, classLoader));
    }

    private static void checkName(String functionName, boolean isNative) {
        boolean checkResult = true;
        if (isNative) {
            if (!functionName.startsWith("dx_")) {
                checkResult = false;
            }
        } else {
            if (functionName.startsWith("dx_")) {
                checkResult = false;
            }
        }

        if (!checkResult) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_NAME_ERROR, " name=" + functionName + ": isNative=" + isNative);
        }
    }

    private static TransformerInfo buildTransformerInfo(ComplexTransformer complexTransformer, boolean isNative, ClassLoader classLoader) {
        TransformerInfo transformerInfo = new TransformerInfo();
        transformerInfo.setClassLoader(classLoader);
        transformerInfo.setIsNative(isNative);
        transformerInfo.setTransformer(complexTransformer);
        return transformerInfo;
    }

    public static List<String> getAllSuportTransformer() {
        return new ArrayList<String>(registedTransformer.keySet());
    }
}