TransformerRegistry
class, used to register, load and manage data transformers. The following is an explanation of the function of each part:
- First, this class maintains a map called
registedTransformer
to store registered transformer information. - In the static code block, some native converter instances are built in and registered to
registedTransformer
. - The
loadTransformerFromLocalStorage
method is used to load a converter from local storage, optionally loading a specified converter. It iterates through the converter files in the specified directory and attempts to load
Per converter, log an error if loading fails.
- The
loadTransformer
method is used to load a single transformer. It loads the configuration based on the path to the converter configuration file, and then loads the corresponding class based on the class name in the configuration. Depending on the type of class (whether it inherits fromComplexTransformer
orTransformer
), register the transformer instance intoregistedTransformer
. - The
getTransformer
method is used to obtain the converter information of the specified name. It is searched fromregistedTransformer
. If it is not found, it may be read from the disk (TODO: According to the comments, this part May be an unimplemented feature). - The
registTransformer
andregistComplexTransformer
methods are used to register transformers. They check whether the converter name meets the naming rules, and build the converter information into aTransformerInfo
instance and add it toregistedTransformer
. - The
checkName
method is used to check whether the converter name meets the naming rules and determines whether it needs to start with “dx_” based on theisNative
parameter. - The
buildTransformerInfo
method is used to build aTransformerInfo
instance, which contains the converter’s class loader, whether it is a native converter, and the actual converter instance. - The
getAllSuportTransformer
method returns a list of names of all supported converters.
The main function of this class is to provide converter registration, loading and management functions so that data converters can be dynamically added and used. It plays a very important role in the data processing process, especially the data extraction and transformation stages.
public class TransformerRegistry { private static final Logger LOG = LoggerFactory.getLogger(TransformerRegistry.class); private static Map<String, TransformerInfo> registeredTransformer = new HashMap<String, TransformerInfo>(); static { //Add some built-in native converters // Converters stored locally and loaded from the server will be loaded lazily registerTransformer(new SubstrTransformer()); registerTransformer(new PadTransformer()); registerTransformer(new ReplaceTransformer()); registerTransformer(new FilterTransformer()); registerTransformer(new GroovyTransformer()); registerTransformer(new DigestTransformer()); } // Load converters from local storage (all converters are loaded by default) public static void loadTransformerFromLocalStorage() { loadTransformerFromLocalStorage(null); } // Load converter from local storage (optionally load specific converter) public static void loadTransformerFromLocalStorage(List<String> transformers) { String[] paths = new File(CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME).list(); if (null == paths) { return; } for (final String each : paths) { try { if (transformers == null || transformers.contains(each)) { loadTransformer(each); } } catch (Exception e) { LOG.error(String.format("Skipping the loading of converter (%s), an exception occurred in loadTransformer (%s)", each, e.getMessage()), e); } } } //Load the specified converter public static void loadTransformer(String each) { String transformerPath = CoreConstant.DATAX_STORAGE_TRANSFORMER_HOME + File.separator + each; Configuration transformerConfiguration; try { transformerConfiguration = loadTransFormerConfig(transformerPath); } catch (Exception e) { LOG.error(String.format("Skipping converter (%s), error loading transformer.json, path = %s", each, transformerPath), e); return; } String className = transformerConfiguration.getString("class"); if (StringUtils.isEmpty(className)) { LOG.error(String.format("Skip converter (%s), class not configured, path = %s, configuration = %s", each, transformerPath, transformerConfiguration.beautify())); return; } String funName = transformerConfiguration.getString("name"); if (!each.equals(funName)) { LOG.warn(String.format("The name of the converter (%s) does not match the name of the transformer.json configuration [%s], the name of the JSON will be ignored, path = %s, configuration = %s", each, funName, transformerPath, transformerConfiguration.beautify())); } JarLoader jarLoader = new JarLoader(new String[]{transformerPath}); try { Class<?> transformerClass = jarLoader.loadClass(className); Object transformer = transformerClass.newInstance(); if (ComplexTransformer.class.isAssignableFrom(transformer.getClass())) { ((ComplexTransformer) transformer).setTransformerName(each); registerComplexTransformer((ComplexTransformer) transformer, jarLoader, false); } else if (Transformer.class.isAssignableFrom(transformer.getClass())) { ((Transformer) transformer).setTransformerName(each); registerTransformer((Transformer) transformer, jarLoader, false); } else { LOG.error(String.format("Error loading Transformer class (%s), path = %s", className, transformerPath)); } } catch (Exception e) { // Error converter skipped LOG.error(String.format("Skipping converter (%s), error loading Transformer class, path = %s ", each, transformerPath), e); } } private static Configuration loadTransFormerConfig(String transformerPath) { return Configuration.from(new File(transformerPath + File.separator + "transformer.json")); } public static TransformerInfo getTransformer(String transformerName) { TransformerInfo result = registeredTransformer.get(transformerName); // If result == null, try reading from disk // TODO: This part may be an unimplemented function and needs to be developed. return result; } public static synchronized void registTransformer(Transformer transformer) { registerTransformer(transformer, null, true); } public static synchronized void registTransformer(Transformer transformer, ClassLoader classLoader, boolean isNative) { checkName(transformer.getTransformerName(), isNative); if (registedTransformer.containsKey(transformer.getTransformerName())) { throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR, " name=" + transformer.getTransformerName()); } registeredTransformer.put(transformer.getTransformerName(), buildTransformerInfo(new ComplexTransformerProxy(transformer), isNative, classLoader)); } public static synchronized void registComplexTransformer(ComplexTransformer complexTransformer, ClassLoader classLoader, boolean isNative) { checkName(complexTransformer.getTransformerName(), isNative); if (registedTransformer.containsKey(complexTransformer.getTransformerName())) { throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR, " name=" + complexTransformer.getTransformerName()); } registeredTransformer.put(complexTransformer.getTransformerName(), buildTransformerInfo(complexTransformer, isNative, classLoader)); } private static void checkName(String functionName, boolean isNative) { boolean checkResult = true; if (isNative) { if (!functionName.startsWith("dx_")) { checkResult = false; } } else { if (functionName.startsWith("dx_")) { checkResult = false; } } if (!checkResult) { throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_NAME_ERROR, " name=" + functionName + ": isNative=" + isNative); } } private static TransformerInfo buildTransformerInfo(ComplexTransformer complexTransformer, boolean isNative, ClassLoader classLoader) { TransformerInfo transformerInfo = new TransformerInfo(); transformerInfo.setClassLoader(classLoader); transformerInfo.setIsNative(isNative); transformerInfo.setTransformer(complexTransformer); return transformerInfo; } public static List<String> getAllSuportTransformer() { return new ArrayList<String>(registedTransformer.keySet()); } }